Skip to main content

reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eip7928::bal::DecodedBal;
7use alloy_eips::BlockHashOrNumber;
8use alloy_primitives::{Address, TxHash, B256};
9use futures::{stream::FuturesOrdered, Stream, StreamExt};
10use reth_chain_state::CanonStateNotification;
11use reth_errors::{ProviderError, ProviderResult};
12use reth_execution_types::Chain;
13use reth_primitives_traits::{Block, BlockBody, InMemorySize, NodePrimitives, RecoveredBlock};
14use reth_revm::{
15    bytecode::Bytecode,
16    primitives::{StorageKey, StorageValue},
17    state::bal::{
18        AccountBal as RevmAccountBal, AccountInfoBal as RevmAccountInfoBal, Bal as RevmBal,
19        BalWrites as RevmBalWrites, StorageBal as RevmStorageBal,
20    },
21};
22use reth_storage_api::{BalProvider, BlockReader, TransactionVariant};
23use reth_tasks::Runtime;
24use schnellru::{ByLength, Limiter, LruMap};
25use std::{
26    future::Future,
27    pin::Pin,
28    sync::Arc,
29    task::{Context, Poll},
30};
31use tokio::sync::{
32    mpsc::{unbounded_channel, UnboundedSender},
33    oneshot, Semaphore,
34};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36
37pub mod config;
38pub mod db;
39pub mod metrics;
40pub mod multi_consumer;
41
42/// The type that can send the response to a requested [`RecoveredBlock`]
43type BlockWithSendersResponseSender<B> =
44    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
45
46/// The type that can send the response to the requested receipts of a block.
47type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
48
49type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
50
51type CachedBlockAndReceiptsResponseSender<B, R> =
52    oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
53
54/// The type that can send the response to a requested header
55type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
56
57/// The type that can send the response with a chain of cached blocks
58type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
59
60/// The type that can send the response for a transaction hash lookup
61type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
62
63/// The type that can send the response to a requested revm BAL.
64type BalResponseSender = oneshot::Sender<ProviderResult<Option<CachedRevmBal>>>;
65
66type BlockLruCache<B, L> =
67    MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
68
69type ReceiptsLruCache<R, L> =
70    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
71
72type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
73
74type BalLruCache<L> = MultiConsumerLruCache<B256, CachedRevmBal, L, BalResponseSender>;
75
76/// Provides async access to cached eth data
77///
78/// This is the frontend for the async caching service which manages cached data on a different
79/// task.
80#[derive(Debug)]
81pub struct EthStateCache<N: NodePrimitives> {
82    to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
83}
84
85impl<N: NodePrimitives> Clone for EthStateCache<N> {
86    fn clone(&self) -> Self {
87        Self { to_service: self.to_service.clone() }
88    }
89}
90
91impl<N: NodePrimitives> EthStateCache<N> {
92    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
93    fn create<Provider>(
94        provider: Provider,
95        action_task_spawner: Runtime,
96        config: EthStateCacheConfig,
97    ) -> (Self, EthStateCacheService<Provider, Runtime>)
98    where
99        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + BalProvider,
100    {
101        let EthStateCacheConfig {
102            max_blocks,
103            max_receipts,
104            max_headers,
105            max_bals,
106            max_concurrent_db_requests,
107            max_cached_tx_hashes,
108        } = config;
109        let (to_service, rx) = unbounded_channel();
110
111        let service = EthStateCacheService {
112            provider,
113            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
114            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
115            headers_cache: HeaderLruCache::new(max_headers, "headers"),
116            bal_cache: BalLruCache::new(max_bals, "bals"),
117            action_tx: to_service.clone(),
118            action_rx: UnboundedReceiverStream::new(rx),
119            action_task_spawner,
120            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_requests)),
121            tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
122        };
123        let cache = Self { to_service };
124        (cache, service)
125    }
126
127    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
128    /// spawner.
129    ///
130    /// The cache is memory limited by the given max bytes values.
131    pub fn spawn_with<Provider>(
132        provider: Provider,
133        config: EthStateCacheConfig,
134        executor: Runtime,
135    ) -> Self
136    where
137        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>
138            + BalProvider
139            + Clone
140            + Unpin
141            + 'static,
142    {
143        let (this, service) = Self::create(provider, executor.clone(), config);
144        executor.spawn_critical_task("eth state cache", service);
145        this
146    }
147
148    /// Requests the  [`RecoveredBlock`] for the block hash
149    ///
150    /// Returns `None` if the block does not exist.
151    pub async fn get_recovered_block(
152        &self,
153        block_hash: B256,
154    ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
155        let (response_tx, rx) = oneshot::channel();
156        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
157        rx.await.map_err(|_| CacheServiceUnavailable)?
158    }
159
160    /// Requests the receipts for the block hash
161    ///
162    /// Returns `None` if the block was not found.
163    pub async fn get_receipts(
164        &self,
165        block_hash: B256,
166    ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
167        let (response_tx, rx) = oneshot::channel();
168        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
169        rx.await.map_err(|_| CacheServiceUnavailable)?
170    }
171
172    /// Fetches both receipts and block for the given block hash.
173    pub async fn get_block_and_receipts(
174        &self,
175        block_hash: B256,
176    ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
177        let block = self.get_recovered_block(block_hash);
178        let receipts = self.get_receipts(block_hash);
179
180        let (block, receipts) = futures::try_join!(block, receipts)?;
181
182        Ok(block.zip(receipts))
183    }
184
185    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
186    pub async fn get_receipts_and_maybe_block(
187        &self,
188        block_hash: B256,
189    ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
190        let (response_tx, rx) = oneshot::channel();
191        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
192
193        let receipts = self.get_receipts(block_hash);
194
195        let (receipts, block) = futures::join!(receipts, rx);
196
197        let block = block.map_err(|_| CacheServiceUnavailable)?;
198        Ok(receipts?.map(|r| (r, block)))
199    }
200
201    /// Retrieves both block and receipts from cache if available.
202    pub async fn maybe_cached_block_and_receipts(
203        &self,
204        block_hash: B256,
205    ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
206        let (response_tx, rx) = oneshot::channel();
207        let _ = self
208            .to_service
209            .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
210        rx.await.map_err(|_| CacheServiceUnavailable.into())
211    }
212
213    /// Streams cached receipts and blocks for a list of block hashes, preserving input order.
214    #[expect(clippy::type_complexity)]
215    pub fn get_receipts_and_maybe_block_stream<'a>(
216        &'a self,
217        hashes: Vec<B256>,
218    ) -> impl Stream<
219        Item = ProviderResult<
220            Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
221        >,
222    > + 'a {
223        let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
224
225        futures.collect::<FuturesOrdered<_>>()
226    }
227
228    /// Requests the header for the given hash.
229    ///
230    /// Returns an error if the header is not found.
231    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
232        let (response_tx, rx) = oneshot::channel();
233        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
234        rx.await.map_err(|_| CacheServiceUnavailable)?
235    }
236
237    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
238    /// and traversing down through parent hashes. Returns blocks in descending order (newest
239    /// first).
240    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
241    /// cache without making separate database requests.
242    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
243    /// with at least one block.
244    pub async fn get_cached_parent_blocks(
245        &self,
246        block_hash: B256,
247        max_blocks: usize,
248    ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
249        let (response_tx, rx) = oneshot::channel();
250        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
251            block_hash,
252            max_blocks,
253            response_tx,
254        });
255
256        let blocks = rx.await.unwrap_or_default();
257        if blocks.is_empty() {
258            None
259        } else {
260            Some(blocks)
261        }
262    }
263
264    /// Looks up a transaction by its hash in the cache index.
265    ///
266    /// Returns the cached block, transaction index, and optionally receipts if the transaction
267    /// is in a cached block.
268    pub async fn get_transaction_by_hash(
269        &self,
270        tx_hash: TxHash,
271    ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
272        let (response_tx, rx) = oneshot::channel();
273        let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
274        rx.await.ok()?
275    }
276
277    /// Requests the revm BAL for the block hash.
278    ///
279    /// Returns `None` if the BAL does not exist.
280    pub async fn get_bal(
281        &self,
282        block_hash: B256,
283    ) -> ProviderResult<Option<Arc<DecodedBal<RevmBal>>>> {
284        let (response_tx, rx) = oneshot::channel();
285        let _ = self.to_service.send(CacheAction::GetBal { block_hash, response_tx });
286        rx.await
287            .map_err(|_| CacheServiceUnavailable)?
288            .map(|maybe_bal| maybe_bal.map(|cached| cached.0))
289    }
290}
291/// Thrown when the cache service task dropped.
292#[derive(Debug, thiserror::Error)]
293#[error("cache service task stopped")]
294pub struct CacheServiceUnavailable;
295
296impl From<CacheServiceUnavailable> for ProviderError {
297    fn from(err: CacheServiceUnavailable) -> Self {
298        Self::other(err)
299    }
300}
301
302/// A task that manages caches for data required by the `eth` rpc implementation.
303///
304/// It provides a caching layer on top of the given
305/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
306/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
307/// into the cache afterwards. While fetching data from disk is sync, this service is async since
308/// requests and data is shared via channels.
309///
310/// This type is an endless future that listens for incoming messages from the user facing
311/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
312/// that does the IO and sends the result back to it. This way the caching service only
313/// handles messages and does LRU lookups and never blocking IO.
314///
315/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
316/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
317/// permits to limit concurrent requests.
318#[must_use = "Type does nothing unless spawned"]
319pub(crate) struct EthStateCacheService<
320    Provider,
321    Tasks,
322    LimitBlocks = ByLength,
323    LimitReceipts = ByLength,
324    LimitHeaders = ByLength,
325    LimitBals = ByLength,
326> where
327    Provider: BlockReader + BalProvider,
328    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
329    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
330    LimitHeaders: Limiter<B256, Provider::Header>,
331    LimitBals: Limiter<B256, CachedRevmBal>,
332{
333    /// The type used to lookup data from disk
334    provider: Provider,
335    /// The LRU cache for full blocks grouped by their block hash.
336    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
337    /// The LRU cache for block receipts grouped by the block hash.
338    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
339    /// The LRU cache for headers.
340    ///
341    /// Headers are cached because they are required to populate the environment for execution
342    /// (evm).
343    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
344    /// The LRU cache for revm BALs grouped by the block hash.
345    bal_cache: BalLruCache<LimitBals>,
346    /// Sender half of the action channel.
347    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
348    /// Receiver half of the action channel.
349    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
350    /// The type that's used to spawn tasks that do the actual work
351    action_task_spawner: Tasks,
352    /// Rate limiter for spawned fetch tasks.
353    ///
354    /// This restricts the max concurrent fetch tasks at the same time.
355    rate_limiter: Arc<Semaphore>,
356    /// LRU index mapping transaction hashes to their block hash and index within the block.
357    tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
358}
359
360impl<Provider> EthStateCacheService<Provider, Runtime>
361where
362    Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
363{
364    /// Indexes all transactions in a block by transaction hash.
365    fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
366        let block_hash = block.hash();
367        for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
368            self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
369        }
370    }
371
372    /// Removes transaction index entries for a reorged block.
373    fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
374        for tx in block.body().transactions() {
375            self.tx_hash_index.remove(tx.tx_hash());
376        }
377    }
378
379    fn on_new_block(
380        &mut self,
381        block_hash: B256,
382        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
383    ) {
384        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
385            // send the response to queued senders
386            for tx in queued {
387                let _ = tx.send(res.clone());
388            }
389        }
390
391        // cache good block
392        if let Ok(Some(block)) = res {
393            self.full_block_cache.insert(block_hash, block);
394        }
395    }
396
397    fn on_new_receipts(
398        &mut self,
399        block_hash: B256,
400        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
401    ) {
402        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
403            // send the response to queued senders
404            for tx in queued {
405                let _ = tx.send(res.clone());
406            }
407        }
408
409        // cache good receipts
410        if let Ok(Some(receipts)) = res {
411            self.receipts_cache.insert(block_hash, receipts);
412        }
413    }
414
415    fn on_new_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
416        if let Some(queued) = self.bal_cache.remove(&block_hash) {
417            for tx in queued {
418                let _ = tx.send(res.clone());
419            }
420        }
421
422        if let Ok(Some(bal)) = res {
423            self.bal_cache.insert(block_hash, bal);
424        }
425    }
426
427    fn on_reorg_block(
428        &mut self,
429        block_hash: B256,
430        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
431    ) {
432        let res = res.map(|b| b.map(Arc::new));
433        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
434            // send the response to queued senders
435            for tx in queued {
436                let _ = tx.send(res.clone());
437            }
438        }
439    }
440
441    fn on_reorg_receipts(
442        &mut self,
443        block_hash: B256,
444        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
445    ) {
446        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
447            // send the response to queued senders
448            for tx in queued {
449                let _ = tx.send(res.clone());
450            }
451        }
452    }
453
454    fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
455        if let Some(queued) = self.headers_cache.remove(&block_hash) {
456            // send the response to queued senders
457            for tx in queued {
458                let _ = tx.send(res.clone());
459            }
460        }
461    }
462
463    fn on_reorg_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
464        if let Some(queued) = self.bal_cache.remove(&block_hash) {
465            for tx in queued {
466                let _ = tx.send(res.clone());
467            }
468        }
469    }
470
471    /// Shrinks the queues but leaves some space for the next requests
472    fn shrink_queues(&mut self) {
473        let min_capacity = 2;
474        self.full_block_cache.shrink_to(min_capacity);
475        self.receipts_cache.shrink_to(min_capacity);
476        self.headers_cache.shrink_to(min_capacity);
477        self.bal_cache.shrink_to(min_capacity);
478    }
479
480    fn update_cached_metrics(&self) {
481        self.full_block_cache.update_cached_metrics();
482        self.receipts_cache.update_cached_metrics();
483        self.headers_cache.update_cached_metrics();
484        self.bal_cache.update_cached_metrics();
485    }
486}
487
488impl<Provider> Future for EthStateCacheService<Provider, Runtime>
489where
490    Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
491{
492    type Output = ();
493
494    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
495        let this = self.get_mut();
496
497        loop {
498            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
499                // shrink queues if we don't have any work to do
500                this.shrink_queues();
501                return Poll::Pending;
502            };
503
504            match action {
505                None => {
506                    unreachable!("can't close")
507                }
508                Some(action) => {
509                    match action {
510                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
511                            let _ =
512                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
513                        }
514                        CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
515                            let block = this.full_block_cache.get(&block_hash).cloned();
516                            let receipts = this.receipts_cache.get(&block_hash).cloned();
517                            let _ = response_tx.send((block, receipts));
518                        }
519                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
520                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
521                                let _ = response_tx.send(Ok(Some(block)));
522                                continue
523                            }
524
525                            // block is not in the cache, request it if this is the first consumer
526                            if this.full_block_cache.queue(block_hash, response_tx) {
527                                let provider = this.provider.clone();
528                                let action_tx = this.action_tx.clone();
529                                let rate_limiter = this.rate_limiter.clone();
530                                let mut action_sender =
531                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
532                                this.action_task_spawner.spawn_blocking_task(async move {
533                                    // Acquire permit
534                                    let _permit = rate_limiter.acquire().await;
535                                    // Only look in the database to prevent situations where we
536                                    // looking up the tree is blocking
537                                    let block_sender = provider
538                                        .sealed_block_with_senders(
539                                            BlockHashOrNumber::Hash(block_hash),
540                                            TransactionVariant::WithHash,
541                                        )
542                                        .map(|maybe_block| maybe_block.map(Arc::new));
543                                    action_sender.send_block(block_sender);
544                                });
545                            }
546                        }
547                        CacheAction::GetReceipts { block_hash, response_tx } => {
548                            // check if block is cached
549                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
550                                let _ = response_tx.send(Ok(Some(receipts)));
551                                continue
552                            }
553
554                            // block is not in the cache, request it if this is the first consumer
555                            if this.receipts_cache.queue(block_hash, response_tx) {
556                                let provider = this.provider.clone();
557                                let action_tx = this.action_tx.clone();
558                                let rate_limiter = this.rate_limiter.clone();
559                                let mut action_sender =
560                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
561                                this.action_task_spawner.spawn_blocking_task(async move {
562                                    // Acquire permit
563                                    let _permit = rate_limiter.acquire().await;
564                                    let res = provider
565                                        .receipts_by_block(block_hash.into())
566                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
567
568                                    action_sender.send_receipts(res);
569                                });
570                            }
571                        }
572                        CacheAction::GetHeader { block_hash, response_tx } => {
573                            // check if the header is cached
574                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
575                                let _ = response_tx.send(Ok(header));
576                                continue
577                            }
578
579                            // it's possible we have the entire block cached
580                            if let Some(block) = this.full_block_cache.get(&block_hash) {
581                                let _ = response_tx.send(Ok(block.clone_header()));
582                                continue
583                            }
584
585                            // header is not in the cache, request it if this is the first
586                            // consumer
587                            if this.headers_cache.queue(block_hash, response_tx) {
588                                let provider = this.provider.clone();
589                                let action_tx = this.action_tx.clone();
590                                let rate_limiter = this.rate_limiter.clone();
591                                let mut action_sender =
592                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
593                                this.action_task_spawner.spawn_blocking_task(async move {
594                                    // Acquire permit
595                                    let _permit = rate_limiter.acquire().await;
596                                    let header = provider.header(block_hash).and_then(|header| {
597                                        header.ok_or_else(|| {
598                                            ProviderError::HeaderNotFound(block_hash.into())
599                                        })
600                                    });
601                                    action_sender.send_header(header);
602                                });
603                            }
604                        }
605                        CacheAction::GetBal { block_hash, response_tx } => {
606                            if let Some(bal) = this.bal_cache.get(&block_hash).cloned() {
607                                let _ = response_tx.send(Ok(Some(bal)));
608                                continue
609                            }
610
611                            if this.bal_cache.queue(block_hash, response_tx) {
612                                let provider = this.provider.clone();
613                                let action_tx = this.action_tx.clone();
614                                let rate_limiter = this.rate_limiter.clone();
615                                let mut action_sender =
616                                    ActionSender::new(CacheKind::Bal, block_hash, action_tx);
617                                this.action_task_spawner.spawn_blocking_task(async move {
618                                    let _permit = rate_limiter.acquire().await;
619                                    let res = provider
620                                        .bal_store()
621                                        .revm_bal_by_hash(block_hash)
622                                        .map(|maybe_bal| maybe_bal.map(CachedRevmBal::new));
623                                    action_sender.send_bal(res);
624                                });
625                            }
626                        }
627                        CacheAction::ReceiptsResult { block_hash, res } => {
628                            this.on_new_receipts(block_hash, res);
629                        }
630                        CacheAction::BalResult { block_hash, res } => {
631                            this.on_new_bal(block_hash, res);
632                        }
633                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
634                            Ok(Some(block_with_senders)) => {
635                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
636                            }
637                            Ok(None) => {
638                                this.on_new_block(block_hash, Ok(None));
639                            }
640                            Err(e) => {
641                                this.on_new_block(block_hash, Err(e));
642                            }
643                        },
644                        CacheAction::HeaderResult { block_hash, res } => {
645                            let res = *res;
646                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
647                                // send the response to queued senders
648                                for tx in queued {
649                                    let _ = tx.send(res.clone());
650                                }
651                            }
652
653                            // cache good header
654                            if let Ok(data) = res {
655                                this.headers_cache.insert(block_hash, data);
656                            }
657                        }
658                        CacheAction::CacheNewCanonicalChain { chain_change } => {
659                            for block in chain_change.blocks {
660                                // Index transactions before caching the block
661                                this.index_block_transactions(&block);
662                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
663                            }
664
665                            for block_receipts in chain_change.receipts {
666                                this.on_new_receipts(
667                                    block_receipts.block_hash,
668                                    Ok(Some(Arc::new(block_receipts.receipts))),
669                                );
670                            }
671                        }
672                        CacheAction::RemoveReorgedChain { chain_change } => {
673                            for block in chain_change.blocks {
674                                let block_hash = block.hash();
675                                let header = block.clone_header();
676                                // Remove transaction index entries for reorged blocks
677                                this.remove_block_transactions(&block);
678                                this.on_reorg_block(block_hash, Ok(Some(block)));
679                                this.on_reorg_header(block_hash, Ok(header));
680                                this.on_reorg_bal(block_hash, Ok(None));
681                            }
682
683                            for block_receipts in chain_change.receipts {
684                                this.on_reorg_receipts(
685                                    block_receipts.block_hash,
686                                    Ok(Some(Arc::new(block_receipts.receipts))),
687                                );
688                            }
689                        }
690                        CacheAction::GetCachedParentBlocks {
691                            block_hash,
692                            max_blocks,
693                            response_tx,
694                        } => {
695                            let mut blocks = Vec::new();
696                            let mut current_hash = block_hash;
697
698                            // Start with the requested block
699                            while blocks.len() < max_blocks {
700                                if let Some(block) =
701                                    this.full_block_cache.get(&current_hash).cloned()
702                                {
703                                    // Get the parent hash for the next iteration
704                                    current_hash = block.header().parent_hash();
705                                    blocks.push(block);
706                                } else {
707                                    // Break the loop if we can't find the current block
708                                    break;
709                                }
710                            }
711
712                            let _ = response_tx.send(blocks);
713                        }
714                        CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
715                            let result =
716                                this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
717                                    let block = this.full_block_cache.get(block_hash).cloned()?;
718                                    let receipts = this.receipts_cache.get(block_hash).cloned();
719                                    Some(CachedTransaction::new(block, *idx, receipts))
720                                });
721                            let _ = response_tx.send(result);
722                        }
723                    };
724                    this.update_cached_metrics();
725                }
726            }
727        }
728    }
729}
730
731/// All message variants sent through the channel
732enum CacheAction<B: Block, R> {
733    GetBlockWithSenders {
734        block_hash: B256,
735        response_tx: BlockWithSendersResponseSender<B>,
736    },
737    GetHeader {
738        block_hash: B256,
739        response_tx: HeaderResponseSender<B::Header>,
740    },
741    GetReceipts {
742        block_hash: B256,
743        response_tx: ReceiptsResponseSender<R>,
744    },
745    GetBal {
746        block_hash: B256,
747        response_tx: BalResponseSender,
748    },
749    GetCachedBlock {
750        block_hash: B256,
751        response_tx: CachedBlockResponseSender<B>,
752    },
753    GetCachedBlockAndReceipts {
754        block_hash: B256,
755        response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
756    },
757    BlockWithSendersResult {
758        block_hash: B256,
759        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
760    },
761    ReceiptsResult {
762        block_hash: B256,
763        res: ProviderResult<Option<Arc<Vec<R>>>>,
764    },
765    HeaderResult {
766        block_hash: B256,
767        res: Box<ProviderResult<B::Header>>,
768    },
769    BalResult {
770        block_hash: B256,
771        res: ProviderResult<Option<CachedRevmBal>>,
772    },
773    CacheNewCanonicalChain {
774        chain_change: ChainChange<B, R>,
775    },
776    RemoveReorgedChain {
777        chain_change: ChainChange<B, R>,
778    },
779    GetCachedParentBlocks {
780        block_hash: B256,
781        max_blocks: usize,
782        response_tx: CachedParentBlocksResponseSender<B>,
783    },
784    /// Look up a transaction's cached data by its hash
785    GetTransactionByHash {
786        tx_hash: TxHash,
787        response_tx: TransactionHashResponseSender<B, R>,
788    },
789}
790
791struct BlockReceipts<R> {
792    block_hash: B256,
793    receipts: Vec<R>,
794}
795
796/// A change of the canonical chain
797struct ChainChange<B: Block, R> {
798    blocks: Vec<RecoveredBlock<B>>,
799    receipts: Vec<BlockReceipts<R>>,
800}
801
802impl<B: Block, R: Clone> ChainChange<B, R> {
803    fn new<N>(chain: Arc<Chain<N>>) -> Self
804    where
805        N: NodePrimitives<Block = B, Receipt = R>,
806    {
807        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
808            .blocks_and_receipts()
809            .map(|(block, receipts)| {
810                let block_receipts =
811                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
812                (block.clone(), block_receipts)
813            })
814            .unzip();
815        Self { blocks, receipts }
816    }
817}
818
819/// Identifier for the caches.
820#[derive(Copy, Clone, Debug)]
821enum CacheKind {
822    Block,
823    Receipt,
824    Header,
825    Bal,
826}
827
828/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
829/// before a result could be sent.
830///
831/// This type wraps a sender and in case the sender is still present on drop emit an error response.
832#[derive(Debug)]
833struct ActionSender<B: Block, R: Send + Sync> {
834    kind: CacheKind,
835    blockhash: B256,
836    tx: Option<UnboundedSender<CacheAction<B, R>>>,
837}
838
839impl<R: Send + Sync, B: Block> ActionSender<B, R> {
840    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
841        Self { kind, blockhash, tx: Some(tx) }
842    }
843
844    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
845        if let Some(tx) = self.tx.take() {
846            let _ = tx.send(CacheAction::BlockWithSendersResult {
847                block_hash: self.blockhash,
848                res: block_sender,
849            });
850        }
851    }
852
853    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
854        if let Some(tx) = self.tx.take() {
855            let _ =
856                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
857        }
858    }
859
860    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
861        if let Some(tx) = self.tx.take() {
862            let _ = tx.send(CacheAction::HeaderResult {
863                block_hash: self.blockhash,
864                res: Box::new(header),
865            });
866        }
867    }
868
869    fn send_bal(&mut self, bal: Result<Option<CachedRevmBal>, ProviderError>) {
870        if let Some(tx) = self.tx.take() {
871            let _ = tx.send(CacheAction::BalResult { block_hash: self.blockhash, res: bal });
872        }
873    }
874}
875impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
876    fn drop(&mut self) {
877        if let Some(tx) = self.tx.take() {
878            let msg = match self.kind {
879                CacheKind::Block => CacheAction::BlockWithSendersResult {
880                    block_hash: self.blockhash,
881                    res: Err(CacheServiceUnavailable.into()),
882                },
883                CacheKind::Receipt => CacheAction::ReceiptsResult {
884                    block_hash: self.blockhash,
885                    res: Err(CacheServiceUnavailable.into()),
886                },
887                CacheKind::Header => CacheAction::HeaderResult {
888                    block_hash: self.blockhash,
889                    res: Box::new(Err(CacheServiceUnavailable.into())),
890                },
891                CacheKind::Bal => CacheAction::BalResult {
892                    block_hash: self.blockhash,
893                    res: Err(CacheServiceUnavailable.into()),
894                },
895            };
896            let _ = tx.send(msg);
897        }
898    }
899}
900
901/// Awaits for new chain events and directly inserts them into the cache so they're available
902/// immediately before they need to be fetched from disk.
903///
904/// Reorged blocks are removed from the cache.
905pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
906    eth_state_cache: EthStateCache<N>,
907    mut events: St,
908) where
909    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
910{
911    while let Some(event) = events.next().await {
912        if let Some(reverted) = event.reverted() {
913            let chain_change = ChainChange::new(reverted);
914
915            let _ =
916                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
917        }
918
919        let chain_change = ChainChange::new(event.committed());
920
921        let _ =
922            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
923    }
924}
925
926/// Cached decoded revm BAL.
927#[derive(Clone, Debug)]
928pub(crate) struct CachedRevmBal(Arc<DecodedBal<RevmBal>>);
929
930impl CachedRevmBal {
931    /// Creates a cached revm BAL from an owned decoded BAL.
932    #[inline]
933    fn new(bal: DecodedBal<RevmBal>) -> Self {
934        Self(Arc::new(bal))
935    }
936}
937
938impl InMemorySize for CachedRevmBal {
939    fn size(&self) -> usize {
940        core::mem::size_of::<Self>() + decoded_revm_bal_size(&self.0)
941    }
942}
943
944fn decoded_revm_bal_size(bal: &DecodedBal<RevmBal>) -> usize {
945    core::mem::size_of::<DecodedBal<RevmBal>>() + bal.as_raw().len() + revm_bal_size(bal.as_bal())
946}
947
948fn revm_bal_size(bal: &RevmBal) -> usize {
949    core::mem::size_of::<RevmBal>() +
950        bal.accounts.capacity() * core::mem::size_of::<(Address, RevmAccountBal)>() +
951        bal.accounts.values().map(revm_account_bal_heap_size).sum::<usize>()
952}
953
954fn revm_account_bal_heap_size(account: &RevmAccountBal) -> usize {
955    revm_account_info_bal_heap_size(&account.account_info) +
956        revm_storage_bal_heap_size(&account.storage)
957}
958
959fn revm_account_info_bal_heap_size(account_info: &RevmAccountInfoBal) -> usize {
960    revm_bal_writes_heap_size(&account_info.nonce, |_| 0) +
961        revm_bal_writes_heap_size(&account_info.balance, |_| 0) +
962        revm_bal_writes_heap_size(&account_info.code, revm_code_write_heap_size)
963}
964
965fn revm_storage_bal_heap_size(storage: &RevmStorageBal) -> usize {
966    storage.storage.len() * core::mem::size_of::<(StorageKey, RevmBalWrites<StorageValue>)>() +
967        storage
968            .storage
969            .values()
970            .map(|writes| revm_bal_writes_heap_size(writes, |_| 0))
971            .sum::<usize>()
972}
973
974fn revm_bal_writes_heap_size<T, F>(writes: &RevmBalWrites<T>, mut item_heap_size: F) -> usize
975where
976    T: PartialEq + Clone,
977    F: FnMut(&T) -> usize,
978{
979    writes.writes.capacity() * core::mem::size_of::<(u64, T)>() +
980        writes.writes.iter().map(|(_, item)| item_heap_size(item)).sum::<usize>()
981}
982
983fn revm_code_write_heap_size((_, bytecode): &(B256, Bytecode)) -> usize {
984    bytecode.bytes_ref().len()
985}
986
987#[cfg(test)]
988mod tests {
989    use super::*;
990    use alloy_consensus::{transaction::TransactionMeta, Header};
991    use alloy_eip7928::BlockAccessIndex;
992    use alloy_eips::{BlockHashOrNumber, NumHash};
993    use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, Signature, TxHash, TxNumber};
994    use core::ops::{RangeBounds, RangeInclusive};
995    use reth_db_models::StoredBlockBodyIndices;
996    use reth_ethereum_primitives::{
997        Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
998    };
999    use reth_primitives_traits::{RecoveredBlock, SealedHeader};
1000    use reth_storage_api::{
1001        noop::NoopProvider, BalProvider, BalStore, BalStoreHandle, BlockBodyIndicesProvider,
1002        BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider,
1003        TransactionVariant, TransactionsProvider,
1004    };
1005    use std::sync::atomic::{AtomicUsize, Ordering};
1006
1007    fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
1008        let (_cache, service) = EthStateCache::<EthPrimitives>::create(
1009            NoopProvider::default(),
1010            Runtime::test(),
1011            EthStateCacheConfig {
1012                max_blocks: 4,
1013                max_receipts: 4,
1014                max_headers: 4,
1015                max_bals: 4,
1016                max_concurrent_db_requests: 1,
1017                max_cached_tx_hashes: 16,
1018            },
1019        );
1020        service
1021    }
1022
1023    fn test_decoded_revm_bal() -> DecodedBal<RevmBal> {
1024        DecodedBal::new(RevmBal::default(), Bytes::from_static(&[0xc0]))
1025    }
1026
1027    fn test_block() -> RecoveredBlock<Block> {
1028        RecoveredBlock::new_unhashed(
1029            Block {
1030                header: Header { number: 1, ..Default::default() },
1031                body: BlockBody {
1032                    transactions: vec![TransactionSigned::new_unhashed(
1033                        Transaction::Legacy(Default::default()),
1034                        Signature::test_signature(),
1035                    )],
1036                    ..Default::default()
1037                },
1038            },
1039            vec![Address::ZERO],
1040        )
1041    }
1042
1043    #[test]
1044    fn reorg_evicts_cached_headers() {
1045        let mut service = test_service();
1046        let block_hash = B256::repeat_byte(0x11);
1047
1048        assert!(service
1049            .headers_cache
1050            .insert(block_hash, Header { number: 42, ..Default::default() }));
1051        assert!(service.headers_cache.get(&block_hash).is_some());
1052
1053        service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
1054
1055        assert!(service.headers_cache.get(&block_hash).is_none());
1056    }
1057
1058    #[test]
1059    fn reorg_forwards_header_to_queued_requests() {
1060        let mut service = test_service();
1061        let block_hash = B256::repeat_byte(0x22);
1062        let (response_tx, mut response_rx) = oneshot::channel();
1063        let header = Header { number: 7, ..Default::default() };
1064
1065        assert!(service.headers_cache.queue(block_hash, response_tx));
1066
1067        service.on_reorg_header(block_hash, Ok(header));
1068
1069        let header =
1070            response_rx.try_recv().expect("queued header response").expect("header result");
1071
1072        assert_eq!(header.number, 7);
1073    }
1074
1075    #[test]
1076    fn reorg_removes_tx_hash_index_entries_unconditionally() {
1077        let mut service = test_service();
1078        let block = test_block();
1079        let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
1080
1081        service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
1082
1083        service.remove_block_transactions(&block);
1084
1085        assert!(service.tx_hash_index.get(&tx_hash).is_none());
1086    }
1087
1088    #[test]
1089    fn reorg_evicts_cached_bal() {
1090        let mut service = test_service();
1091        let block_hash = B256::repeat_byte(0x44);
1092
1093        assert!(service.bal_cache.insert(block_hash, CachedRevmBal::new(test_decoded_revm_bal())));
1094        assert!(service.bal_cache.get(&block_hash).is_some());
1095
1096        service.on_reorg_bal(block_hash, Ok(None));
1097
1098        assert!(service.bal_cache.get(&block_hash).is_none());
1099    }
1100
1101    #[test]
1102    fn reorg_forwards_bal_to_queued_requests() {
1103        let mut service = test_service();
1104        let block_hash = B256::repeat_byte(0x55);
1105        let (response_tx, mut response_rx) = oneshot::channel();
1106        let bal = CachedRevmBal::new(test_decoded_revm_bal());
1107
1108        assert!(service.bal_cache.queue(block_hash, response_tx));
1109
1110        service.on_reorg_bal(block_hash, Ok(Some(bal)));
1111
1112        let bal = response_rx.try_recv().expect("queued BAL response").expect("BAL result");
1113
1114        assert!(bal.is_some());
1115    }
1116
1117    #[test]
1118    fn cached_revm_bal_size_accounts_for_nested_allocations() {
1119        let mut account = RevmAccountBal::default();
1120        account.account_info.nonce.writes.push((BlockAccessIndex::new(1), 1));
1121        account
1122            .account_info
1123            .balance
1124            .writes
1125            .push((BlockAccessIndex::new(2), StorageValue::from(1u64)));
1126        account.account_info.code.writes.push((
1127            BlockAccessIndex::new(3),
1128            (B256::repeat_byte(0xaa), Bytecode::new_raw(Bytes::from_static(&[0x60, 0x00]))),
1129        ));
1130        account.storage.storage.insert(
1131            StorageKey::from(1u64),
1132            RevmBalWrites::new(vec![(BlockAccessIndex::new(4), StorageValue::from(2u64))]),
1133        );
1134
1135        let mut bal = RevmBal::default();
1136        bal.accounts.insert(Address::ZERO, account);
1137
1138        let raw = Bytes::from_static(&[0xc0, 0x01, 0x02]);
1139        let previous_estimate = core::mem::size_of::<CachedRevmBal>() +
1140            core::mem::size_of::<DecodedBal<RevmBal>>() +
1141            raw.len() +
1142            core::mem::size_of::<RevmBal>();
1143        assert!(CachedRevmBal::new(DecodedBal::new(bal, raw)).size() > previous_estimate);
1144    }
1145
1146    #[tokio::test]
1147    async fn get_bal_uses_cached_revm_bal() {
1148        let fetches = Arc::new(AtomicUsize::default());
1149        let provider = TestBalProvider::new(fetches.clone());
1150        let cache = EthStateCache::<EthPrimitives>::spawn_with(
1151            provider,
1152            EthStateCacheConfig {
1153                max_blocks: 0,
1154                max_receipts: 0,
1155                max_headers: 0,
1156                max_bals: 4,
1157                max_concurrent_db_requests: 1,
1158                max_cached_tx_hashes: 0,
1159            },
1160            Runtime::test(),
1161        );
1162        let block_hash = B256::repeat_byte(0x66);
1163
1164        assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1165        assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1166
1167        assert_eq!(fetches.load(Ordering::SeqCst), 1);
1168    }
1169
1170    #[tokio::test]
1171    async fn concurrent_get_bal_requests_share_fetch() {
1172        let fetches = Arc::new(AtomicUsize::default());
1173        let provider = TestBalProvider::new(fetches.clone());
1174        let cache = EthStateCache::<EthPrimitives>::spawn_with(
1175            provider,
1176            EthStateCacheConfig {
1177                max_blocks: 0,
1178                max_receipts: 0,
1179                max_headers: 0,
1180                max_bals: 4,
1181                max_concurrent_db_requests: 1,
1182                max_cached_tx_hashes: 0,
1183            },
1184            Runtime::test(),
1185        );
1186        let block_hash = B256::repeat_byte(0x77);
1187
1188        let (first, second) = tokio::join!(cache.get_bal(block_hash), cache.get_bal(block_hash));
1189
1190        assert!(first.unwrap().is_some());
1191        assert!(second.unwrap().is_some());
1192        assert_eq!(fetches.load(Ordering::SeqCst), 1);
1193    }
1194
1195    #[derive(Clone, Debug, Default)]
1196    struct TestBalProvider {
1197        bal_store: BalStoreHandle,
1198    }
1199
1200    impl TestBalProvider {
1201        fn new(fetches: Arc<AtomicUsize>) -> Self {
1202            Self { bal_store: BalStoreHandle::new(TestBalStore { fetches }) }
1203        }
1204    }
1205
1206    impl BalProvider for TestBalProvider {
1207        fn bal_store(&self) -> &BalStoreHandle {
1208            &self.bal_store
1209        }
1210    }
1211
1212    #[derive(Debug)]
1213    struct TestBalStore {
1214        fetches: Arc<AtomicUsize>,
1215    }
1216
1217    impl BalStore for TestBalStore {
1218        fn insert(
1219            &self,
1220            _num_hash: NumHash,
1221            _bal: reth_storage_api::SealedBal,
1222        ) -> ProviderResult<()> {
1223            Ok(())
1224        }
1225
1226        fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
1227            Ok(0)
1228        }
1229
1230        fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
1231            Ok(block_hashes.iter().map(|_| None).collect())
1232        }
1233
1234        fn revm_bal_by_hash(
1235            &self,
1236            _block_hash: BlockHash,
1237        ) -> ProviderResult<Option<DecodedBal<RevmBal>>> {
1238            self.fetches.fetch_add(1, Ordering::SeqCst);
1239            Ok(Some(test_decoded_revm_bal()))
1240        }
1241
1242        fn bal_stream(&self) -> reth_storage_api::BalNotificationStream {
1243            reth_storage_api::NoopBalStore.bal_stream()
1244        }
1245    }
1246
1247    impl BlockHashReader for TestBalProvider {
1248        fn block_hash(&self, _number: BlockNumber) -> ProviderResult<Option<B256>> {
1249            Ok(None)
1250        }
1251
1252        fn canonical_hashes_range(
1253            &self,
1254            _start: BlockNumber,
1255            _end: BlockNumber,
1256        ) -> ProviderResult<Vec<B256>> {
1257            Ok(Vec::new())
1258        }
1259    }
1260
1261    impl BlockNumReader for TestBalProvider {
1262        fn chain_info(&self) -> ProviderResult<reth_chainspec::ChainInfo> {
1263            Ok(reth_chainspec::ChainInfo::default())
1264        }
1265
1266        fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1267            Ok(0)
1268        }
1269
1270        fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1271            Ok(0)
1272        }
1273
1274        fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1275            Ok(None)
1276        }
1277    }
1278
1279    impl HeaderProvider for TestBalProvider {
1280        type Header = Header;
1281
1282        fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1283            Ok(None)
1284        }
1285
1286        fn header_by_number(&self, _num: u64) -> ProviderResult<Option<Self::Header>> {
1287            Ok(None)
1288        }
1289
1290        fn headers_range(
1291            &self,
1292            _range: impl RangeBounds<BlockNumber>,
1293        ) -> ProviderResult<Vec<Self::Header>> {
1294            Ok(Vec::new())
1295        }
1296
1297        fn sealed_header(
1298            &self,
1299            _number: BlockNumber,
1300        ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1301            Ok(None)
1302        }
1303
1304        fn sealed_headers_while(
1305            &self,
1306            _range: impl RangeBounds<BlockNumber>,
1307            _predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1308        ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1309            Ok(Vec::new())
1310        }
1311    }
1312
1313    impl BlockBodyIndicesProvider for TestBalProvider {
1314        fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1315            Ok(None)
1316        }
1317
1318        fn block_body_indices_range(
1319            &self,
1320            _range: RangeInclusive<BlockNumber>,
1321        ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1322            Ok(Vec::new())
1323        }
1324    }
1325
1326    impl TransactionsProvider for TestBalProvider {
1327        type Transaction = TransactionSigned;
1328
1329        fn transaction_id(&self, _tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1330            Ok(None)
1331        }
1332
1333        fn transaction_by_id(&self, _id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1334            Ok(None)
1335        }
1336
1337        fn transaction_by_id_unhashed(
1338            &self,
1339            _id: TxNumber,
1340        ) -> ProviderResult<Option<Self::Transaction>> {
1341            Ok(None)
1342        }
1343
1344        fn transaction_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1345            Ok(None)
1346        }
1347
1348        fn transaction_by_hash_with_meta(
1349            &self,
1350            _hash: TxHash,
1351        ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1352            Ok(None)
1353        }
1354
1355        fn transactions_by_block(
1356            &self,
1357            _block: BlockHashOrNumber,
1358        ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1359            Ok(None)
1360        }
1361
1362        fn transactions_by_block_range(
1363            &self,
1364            _range: impl RangeBounds<BlockNumber>,
1365        ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1366            Ok(Vec::new())
1367        }
1368
1369        fn transactions_by_tx_range(
1370            &self,
1371            _range: impl RangeBounds<TxNumber>,
1372        ) -> ProviderResult<Vec<Self::Transaction>> {
1373            Ok(Vec::new())
1374        }
1375
1376        fn senders_by_tx_range(
1377            &self,
1378            _range: impl RangeBounds<TxNumber>,
1379        ) -> ProviderResult<Vec<Address>> {
1380            Ok(Vec::new())
1381        }
1382
1383        fn transaction_sender(&self, _id: TxNumber) -> ProviderResult<Option<Address>> {
1384            Ok(None)
1385        }
1386    }
1387
1388    impl ReceiptProvider for TestBalProvider {
1389        type Receipt = Receipt;
1390
1391        fn receipt(&self, _id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1392            Ok(None)
1393        }
1394
1395        fn receipt_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1396            Ok(None)
1397        }
1398
1399        fn receipts_by_block(
1400            &self,
1401            _block: BlockHashOrNumber,
1402        ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1403            Ok(None)
1404        }
1405
1406        fn receipts_by_tx_range(
1407            &self,
1408            _range: impl RangeBounds<TxNumber>,
1409        ) -> ProviderResult<Vec<Self::Receipt>> {
1410            Ok(Vec::new())
1411        }
1412
1413        fn receipts_by_block_range(
1414            &self,
1415            _block_range: RangeInclusive<BlockNumber>,
1416        ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1417            Ok(Vec::new())
1418        }
1419    }
1420
1421    impl BlockReader for TestBalProvider {
1422        type Block = Block;
1423
1424        fn find_block_by_hash(
1425            &self,
1426            _hash: B256,
1427            _source: BlockSource,
1428        ) -> ProviderResult<Option<Self::Block>> {
1429            Ok(None)
1430        }
1431
1432        fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1433            Ok(None)
1434        }
1435
1436        fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1437            Ok(None)
1438        }
1439
1440        fn pending_block_and_receipts(
1441            &self,
1442        ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1443            Ok(None)
1444        }
1445
1446        fn recovered_block(
1447            &self,
1448            _id: BlockHashOrNumber,
1449            _transaction_kind: TransactionVariant,
1450        ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1451            Ok(None)
1452        }
1453
1454        fn sealed_block_with_senders(
1455            &self,
1456            _id: BlockHashOrNumber,
1457            _transaction_kind: TransactionVariant,
1458        ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1459            Ok(None)
1460        }
1461
1462        fn block_range(
1463            &self,
1464            _range: RangeInclusive<BlockNumber>,
1465        ) -> ProviderResult<Vec<Self::Block>> {
1466            Ok(Vec::new())
1467        }
1468
1469        fn block_with_senders_range(
1470            &self,
1471            _range: RangeInclusive<BlockNumber>,
1472        ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1473            Ok(Vec::new())
1474        }
1475
1476        fn recovered_block_range(
1477            &self,
1478            _range: RangeInclusive<BlockNumber>,
1479        ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1480            Ok(Vec::new())
1481        }
1482
1483        fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1484            Ok(None)
1485        }
1486    }
1487}