Skip to main content

reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eip7928::bal::DecodedBal;
7use alloy_eips::BlockHashOrNumber;
8use alloy_primitives::{Address, TxHash, B256};
9use futures::{stream::FuturesOrdered, Stream, StreamExt};
10use reth_chain_state::CanonStateNotification;
11use reth_errors::{ProviderError, ProviderResult};
12use reth_execution_types::Chain;
13use reth_primitives_traits::{Block, BlockBody, InMemorySize, NodePrimitives, RecoveredBlock};
14use reth_revm::{
15    bytecode::Bytecode,
16    primitives::{StorageKey, StorageValue},
17    state::bal::{
18        AccountBal as RevmAccountBal, AccountInfoBal as RevmAccountInfoBal, Bal as RevmBal,
19        BalWrites as RevmBalWrites, StorageBal as RevmStorageBal,
20    },
21};
22use reth_storage_api::{BalProvider, BlockReader, TransactionVariant};
23use reth_tasks::Runtime;
24use schnellru::{ByLength, Limiter, LruMap};
25use std::{
26    future::Future,
27    pin::Pin,
28    sync::Arc,
29    task::{Context, Poll},
30};
31use tokio::sync::{
32    mpsc::{unbounded_channel, UnboundedSender},
33    oneshot, Semaphore,
34};
35use tokio_stream::wrappers::UnboundedReceiverStream;
36
37pub mod config;
38pub mod db;
39pub mod metrics;
40pub mod multi_consumer;
41
42/// The type that can send the response to a requested [`RecoveredBlock`]
43type BlockWithSendersResponseSender<B> =
44    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
45
46/// The type that can send the response to the requested receipts of a block.
47type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
48
49type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
50
51type CachedBlockAndReceiptsResponseSender<B, R> =
52    oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
53
54/// The type that can send the response to a requested header
55type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
56
57/// The type that can send the response with a chain of cached blocks
58type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
59
60/// The type that can send the response for a transaction hash lookup
61type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
62
63/// The type that can send the response to a requested revm BAL.
64type BalResponseSender = oneshot::Sender<ProviderResult<Option<CachedRevmBal>>>;
65
66type BlockLruCache<B, L> =
67    MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
68
69type ReceiptsLruCache<R, L> =
70    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
71
72type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
73
74type BalLruCache<L> = MultiConsumerLruCache<B256, CachedRevmBal, L, BalResponseSender>;
75
76/// Provides async access to cached eth data
77///
78/// This is the frontend for the async caching service which manages cached data on a different
79/// task.
80#[derive(Debug)]
81pub struct EthStateCache<N: NodePrimitives> {
82    to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
83}
84
85impl<N: NodePrimitives> Clone for EthStateCache<N> {
86    fn clone(&self) -> Self {
87        Self { to_service: self.to_service.clone() }
88    }
89}
90
91impl<N: NodePrimitives> EthStateCache<N> {
92    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
93    fn create<Provider>(
94        provider: Provider,
95        action_task_spawner: Runtime,
96        config: EthStateCacheConfig,
97    ) -> (Self, EthStateCacheService<Provider, Runtime>)
98    where
99        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + BalProvider,
100    {
101        let EthStateCacheConfig {
102            max_blocks,
103            max_receipts,
104            max_headers,
105            max_bals,
106            max_concurrent_db_requests,
107            max_cached_tx_hashes,
108        } = config;
109        let (to_service, rx) = unbounded_channel();
110
111        let service = EthStateCacheService {
112            provider,
113            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
114            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
115            headers_cache: HeaderLruCache::new(max_headers, "headers"),
116            bal_cache: BalLruCache::new(max_bals, "bals"),
117            action_tx: to_service.clone(),
118            action_rx: UnboundedReceiverStream::new(rx),
119            action_task_spawner,
120            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_requests)),
121            tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
122        };
123        let cache = Self { to_service };
124        (cache, service)
125    }
126
127    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
128    /// spawner.
129    ///
130    /// The cache is memory limited by the given max bytes values.
131    pub fn spawn_with<Provider>(
132        provider: Provider,
133        config: EthStateCacheConfig,
134        executor: Runtime,
135    ) -> Self
136    where
137        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>
138            + BalProvider
139            + Clone
140            + Unpin
141            + 'static,
142    {
143        let (this, service) = Self::create(provider, executor.clone(), config);
144        executor.spawn_critical_task("eth state cache", service);
145        this
146    }
147
148    /// Requests the  [`RecoveredBlock`] for the block hash
149    ///
150    /// Returns `None` if the block does not exist.
151    pub async fn get_recovered_block(
152        &self,
153        block_hash: B256,
154    ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
155        let (response_tx, rx) = oneshot::channel();
156        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
157        rx.await.map_err(|_| CacheServiceUnavailable)?
158    }
159
160    /// Requests the receipts for the block hash
161    ///
162    /// Returns `None` if the block was not found.
163    pub async fn get_receipts(
164        &self,
165        block_hash: B256,
166    ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
167        let (response_tx, rx) = oneshot::channel();
168        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
169        rx.await.map_err(|_| CacheServiceUnavailable)?
170    }
171
172    /// Fetches both receipts and block for the given block hash.
173    pub async fn get_block_and_receipts(
174        &self,
175        block_hash: B256,
176    ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
177        let block = self.get_recovered_block(block_hash);
178        let receipts = self.get_receipts(block_hash);
179
180        let (block, receipts) = futures::try_join!(block, receipts)?;
181
182        Ok(block.zip(receipts))
183    }
184
185    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
186    pub async fn get_receipts_and_maybe_block(
187        &self,
188        block_hash: B256,
189    ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
190        let (response_tx, rx) = oneshot::channel();
191        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
192
193        let receipts = self.get_receipts(block_hash);
194
195        let (receipts, block) = futures::join!(receipts, rx);
196
197        let block = block.map_err(|_| CacheServiceUnavailable)?;
198        Ok(receipts?.map(|r| (r, block)))
199    }
200
201    /// Retrieves both block and receipts from cache if available.
202    pub async fn maybe_cached_block_and_receipts(
203        &self,
204        block_hash: B256,
205    ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
206        let (response_tx, rx) = oneshot::channel();
207        let _ = self
208            .to_service
209            .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
210        rx.await.map_err(|_| CacheServiceUnavailable.into())
211    }
212
213    /// Streams cached receipts and blocks for a list of block hashes, preserving input order.
214    #[expect(clippy::type_complexity)]
215    pub fn get_receipts_and_maybe_block_stream<'a>(
216        &'a self,
217        hashes: Vec<B256>,
218    ) -> impl Stream<
219        Item = ProviderResult<
220            Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
221        >,
222    > + 'a {
223        let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
224
225        futures.collect::<FuturesOrdered<_>>()
226    }
227
228    /// Requests the header for the given hash.
229    ///
230    /// Returns an error if the header is not found.
231    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
232        let (response_tx, rx) = oneshot::channel();
233        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
234        rx.await.map_err(|_| CacheServiceUnavailable)?
235    }
236
237    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
238    /// and traversing down through parent hashes. Returns blocks in descending order (newest
239    /// first).
240    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
241    /// cache without making separate database requests.
242    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
243    /// with at least one block.
244    pub async fn get_cached_parent_blocks(
245        &self,
246        block_hash: B256,
247        max_blocks: usize,
248    ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
249        let (response_tx, rx) = oneshot::channel();
250        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
251            block_hash,
252            max_blocks,
253            response_tx,
254        });
255
256        let blocks = rx.await.unwrap_or_default();
257        if blocks.is_empty() {
258            None
259        } else {
260            Some(blocks)
261        }
262    }
263
264    /// Looks up a transaction by its hash in the cache index.
265    ///
266    /// Returns the cached block, transaction index, and optionally receipts if the transaction
267    /// is in a cached block.
268    pub async fn get_transaction_by_hash(
269        &self,
270        tx_hash: TxHash,
271    ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
272        let (response_tx, rx) = oneshot::channel();
273        let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
274        rx.await.ok()?
275    }
276
277    /// Requests the revm BAL for the block hash.
278    ///
279    /// Returns `None` if the BAL does not exist.
280    pub async fn get_bal(
281        &self,
282        block_hash: B256,
283    ) -> ProviderResult<Option<Arc<DecodedBal<Arc<RevmBal>>>>> {
284        let (response_tx, rx) = oneshot::channel();
285        let _ = self.to_service.send(CacheAction::GetBal { block_hash, response_tx });
286        rx.await
287            .map_err(|_| CacheServiceUnavailable)?
288            .map(|maybe_bal| maybe_bal.map(|cached| cached.0))
289    }
290}
291/// Thrown when the cache service task dropped.
292#[derive(Debug, thiserror::Error)]
293#[error("cache service task stopped")]
294pub struct CacheServiceUnavailable;
295
296impl From<CacheServiceUnavailable> for ProviderError {
297    fn from(err: CacheServiceUnavailable) -> Self {
298        Self::other(err)
299    }
300}
301
302/// A task that manages caches for data required by the `eth` rpc implementation.
303///
304/// It provides a caching layer on top of the given
305/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
306/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
307/// into the cache afterwards. While fetching data from disk is sync, this service is async since
308/// requests and data is shared via channels.
309///
310/// This type is an endless future that listens for incoming messages from the user facing
311/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
312/// that does the IO and sends the result back to it. This way the caching service only
313/// handles messages and does LRU lookups and never blocking IO.
314///
315/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
316/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
317/// permits to limit concurrent requests.
318#[must_use = "Type does nothing unless spawned"]
319pub(crate) struct EthStateCacheService<
320    Provider,
321    Tasks,
322    LimitBlocks = ByLength,
323    LimitReceipts = ByLength,
324    LimitHeaders = ByLength,
325    LimitBals = ByLength,
326> where
327    Provider: BlockReader + BalProvider,
328    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
329    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
330    LimitHeaders: Limiter<B256, Provider::Header>,
331    LimitBals: Limiter<B256, CachedRevmBal>,
332{
333    /// The type used to lookup data from disk
334    provider: Provider,
335    /// The LRU cache for full blocks grouped by their block hash.
336    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
337    /// The LRU cache for block receipts grouped by the block hash.
338    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
339    /// The LRU cache for headers.
340    ///
341    /// Headers are cached because they are required to populate the environment for execution
342    /// (evm).
343    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
344    /// The LRU cache for revm BALs grouped by the block hash.
345    bal_cache: BalLruCache<LimitBals>,
346    /// Sender half of the action channel.
347    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
348    /// Receiver half of the action channel.
349    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
350    /// The type that's used to spawn tasks that do the actual work
351    action_task_spawner: Tasks,
352    /// Rate limiter for spawned fetch tasks.
353    ///
354    /// This restricts the max concurrent fetch tasks at the same time.
355    rate_limiter: Arc<Semaphore>,
356    /// LRU index mapping transaction hashes to their block hash and index within the block.
357    tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
358}
359
360impl<Provider> EthStateCacheService<Provider, Runtime>
361where
362    Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
363{
364    /// Indexes all transactions in a block by transaction hash.
365    fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
366        let block_hash = block.hash();
367        for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
368            self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
369        }
370    }
371
372    /// Removes transaction index entries for a reorged block.
373    fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
374        for tx in block.body().transactions() {
375            self.tx_hash_index.remove(tx.tx_hash());
376        }
377    }
378
379    fn on_new_block(
380        &mut self,
381        block_hash: B256,
382        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
383    ) {
384        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
385            // send the response to queued senders
386            for tx in queued {
387                let _ = tx.send(res.clone());
388            }
389        }
390
391        // cache good block
392        if let Ok(Some(block)) = res {
393            self.full_block_cache.insert(block_hash, block);
394        }
395    }
396
397    fn on_new_receipts(
398        &mut self,
399        block_hash: B256,
400        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
401    ) {
402        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
403            // send the response to queued senders
404            for tx in queued {
405                let _ = tx.send(res.clone());
406            }
407        }
408
409        // cache good receipts
410        if let Ok(Some(receipts)) = res {
411            self.receipts_cache.insert(block_hash, receipts);
412        }
413    }
414
415    fn on_new_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
416        if let Some(queued) = self.bal_cache.remove(&block_hash) {
417            for tx in queued {
418                let _ = tx.send(res.clone());
419            }
420        }
421
422        if let Ok(Some(bal)) = res {
423            self.bal_cache.insert(block_hash, bal);
424        }
425    }
426
427    fn on_reorg_block(
428        &mut self,
429        block_hash: B256,
430        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
431    ) {
432        let res = res.map(|b| b.map(Arc::new));
433        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
434            // send the response to queued senders
435            for tx in queued {
436                let _ = tx.send(res.clone());
437            }
438        }
439    }
440
441    fn on_reorg_receipts(
442        &mut self,
443        block_hash: B256,
444        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
445    ) {
446        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
447            // send the response to queued senders
448            for tx in queued {
449                let _ = tx.send(res.clone());
450            }
451        }
452    }
453
454    fn on_reorg_header(&mut self, block_hash: B256, res: ProviderResult<Provider::Header>) {
455        if let Some(queued) = self.headers_cache.remove(&block_hash) {
456            // send the response to queued senders
457            for tx in queued {
458                let _ = tx.send(res.clone());
459            }
460        }
461    }
462
463    fn on_reorg_bal(&mut self, block_hash: B256, res: ProviderResult<Option<CachedRevmBal>>) {
464        if let Some(queued) = self.bal_cache.remove(&block_hash) {
465            for tx in queued {
466                let _ = tx.send(res.clone());
467            }
468        }
469    }
470
471    /// Shrinks the queues but leaves some space for the next requests
472    fn shrink_queues(&mut self) {
473        let min_capacity = 2;
474        self.full_block_cache.shrink_to(min_capacity);
475        self.receipts_cache.shrink_to(min_capacity);
476        self.headers_cache.shrink_to(min_capacity);
477        self.bal_cache.shrink_to(min_capacity);
478    }
479
480    fn update_cached_metrics(&self) {
481        self.full_block_cache.update_cached_metrics();
482        self.receipts_cache.update_cached_metrics();
483        self.headers_cache.update_cached_metrics();
484        self.bal_cache.update_cached_metrics();
485    }
486}
487
488impl<Provider> Future for EthStateCacheService<Provider, Runtime>
489where
490    Provider: BlockReader + BalProvider + Clone + Unpin + 'static,
491{
492    type Output = ();
493
494    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
495        let this = self.get_mut();
496
497        loop {
498            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
499                // shrink queues if we don't have any work to do
500                this.shrink_queues();
501                return Poll::Pending;
502            };
503
504            match action {
505                None => {
506                    unreachable!("can't close")
507                }
508                Some(action) => {
509                    match action {
510                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
511                            let _ =
512                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
513                        }
514                        CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
515                            let block = this.full_block_cache.get(&block_hash).cloned();
516                            let receipts = this.receipts_cache.get(&block_hash).cloned();
517                            let _ = response_tx.send((block, receipts));
518                        }
519                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
520                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
521                                let _ = response_tx.send(Ok(Some(block)));
522                                continue
523                            }
524
525                            // block is not in the cache, request it if this is the first consumer
526                            if this.full_block_cache.queue(block_hash, response_tx) {
527                                let provider = this.provider.clone();
528                                let action_tx = this.action_tx.clone();
529                                let rate_limiter = this.rate_limiter.clone();
530                                let mut action_sender =
531                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
532                                this.action_task_spawner.spawn_blocking_task(async move {
533                                    // Acquire permit
534                                    let _permit = rate_limiter.acquire().await;
535                                    // Only look in the database to prevent situations where we
536                                    // looking up the tree is blocking
537                                    let block_sender = provider
538                                        .sealed_block_with_senders(
539                                            BlockHashOrNumber::Hash(block_hash),
540                                            TransactionVariant::WithHash,
541                                        )
542                                        .map(|maybe_block| maybe_block.map(Arc::new));
543                                    action_sender.send_block(block_sender);
544                                });
545                            }
546                        }
547                        CacheAction::GetReceipts { block_hash, response_tx } => {
548                            // check if block is cached
549                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
550                                let _ = response_tx.send(Ok(Some(receipts)));
551                                continue
552                            }
553
554                            // block is not in the cache, request it if this is the first consumer
555                            if this.receipts_cache.queue(block_hash, response_tx) {
556                                let provider = this.provider.clone();
557                                let action_tx = this.action_tx.clone();
558                                let rate_limiter = this.rate_limiter.clone();
559                                let mut action_sender =
560                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
561                                this.action_task_spawner.spawn_blocking_task(async move {
562                                    // Acquire permit
563                                    let _permit = rate_limiter.acquire().await;
564                                    let res = provider
565                                        .receipts_by_block(block_hash.into())
566                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
567
568                                    action_sender.send_receipts(res);
569                                });
570                            }
571                        }
572                        CacheAction::GetHeader { block_hash, response_tx } => {
573                            // check if the header is cached
574                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
575                                let _ = response_tx.send(Ok(header));
576                                continue
577                            }
578
579                            // it's possible we have the entire block cached
580                            if let Some(block) = this.full_block_cache.get(&block_hash) {
581                                let _ = response_tx.send(Ok(block.clone_header()));
582                                continue
583                            }
584
585                            // header is not in the cache, request it if this is the first
586                            // consumer
587                            if this.headers_cache.queue(block_hash, response_tx) {
588                                let provider = this.provider.clone();
589                                let action_tx = this.action_tx.clone();
590                                let rate_limiter = this.rate_limiter.clone();
591                                let mut action_sender =
592                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
593                                this.action_task_spawner.spawn_blocking_task(async move {
594                                    // Acquire permit
595                                    let _permit = rate_limiter.acquire().await;
596                                    let header = provider.header(block_hash).and_then(|header| {
597                                        header.ok_or_else(|| {
598                                            ProviderError::HeaderNotFound(block_hash.into())
599                                        })
600                                    });
601                                    action_sender.send_header(header);
602                                });
603                            }
604                        }
605                        CacheAction::GetBal { block_hash, response_tx } => {
606                            if let Some(bal) = this.bal_cache.get(&block_hash).cloned() {
607                                let _ = response_tx.send(Ok(Some(bal)));
608                                continue
609                            }
610
611                            if this.bal_cache.queue(block_hash, response_tx) {
612                                let provider = this.provider.clone();
613                                let action_tx = this.action_tx.clone();
614                                let rate_limiter = this.rate_limiter.clone();
615                                let mut action_sender =
616                                    ActionSender::new(CacheKind::Bal, block_hash, action_tx);
617                                this.action_task_spawner.spawn_blocking_task(async move {
618                                    let _permit = rate_limiter.acquire().await;
619                                    let res = provider
620                                        .bal_store()
621                                        .revm_bal_by_hash(block_hash)
622                                        .map(|maybe_bal| maybe_bal.map(CachedRevmBal::new));
623                                    action_sender.send_bal(res);
624                                });
625                            }
626                        }
627                        CacheAction::ReceiptsResult { block_hash, res } => {
628                            this.on_new_receipts(block_hash, res);
629                        }
630                        CacheAction::BalResult { block_hash, res } => {
631                            this.on_new_bal(block_hash, res);
632                        }
633                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
634                            Ok(Some(block_with_senders)) => {
635                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
636                            }
637                            Ok(None) => {
638                                this.on_new_block(block_hash, Ok(None));
639                            }
640                            Err(e) => {
641                                this.on_new_block(block_hash, Err(e));
642                            }
643                        },
644                        CacheAction::HeaderResult { block_hash, res } => {
645                            let res = *res;
646                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
647                                // send the response to queued senders
648                                for tx in queued {
649                                    let _ = tx.send(res.clone());
650                                }
651                            }
652
653                            // cache good header
654                            if let Ok(data) = res {
655                                this.headers_cache.insert(block_hash, data);
656                            }
657                        }
658                        CacheAction::CacheNewCanonicalChain { chain_change } => {
659                            for block in chain_change.blocks {
660                                // Index transactions before caching the block
661                                this.index_block_transactions(&block);
662                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
663                            }
664
665                            for block_receipts in chain_change.receipts {
666                                this.on_new_receipts(
667                                    block_receipts.block_hash,
668                                    Ok(Some(Arc::new(block_receipts.receipts))),
669                                );
670                            }
671                        }
672                        CacheAction::RemoveReorgedChain { chain_change } => {
673                            for block in chain_change.blocks {
674                                let block_hash = block.hash();
675                                let header = block.clone_header();
676                                // Remove transaction index entries for reorged blocks
677                                this.remove_block_transactions(&block);
678                                this.on_reorg_block(block_hash, Ok(Some(block)));
679                                this.on_reorg_header(block_hash, Ok(header));
680                                this.on_reorg_bal(block_hash, Ok(None));
681                            }
682
683                            for block_receipts in chain_change.receipts {
684                                this.on_reorg_receipts(
685                                    block_receipts.block_hash,
686                                    Ok(Some(Arc::new(block_receipts.receipts))),
687                                );
688                            }
689                        }
690                        CacheAction::GetCachedParentBlocks {
691                            block_hash,
692                            max_blocks,
693                            response_tx,
694                        } => {
695                            let mut blocks = Vec::new();
696                            let mut current_hash = block_hash;
697
698                            // Start with the requested block
699                            while blocks.len() < max_blocks {
700                                if let Some(block) =
701                                    this.full_block_cache.get(&current_hash).cloned()
702                                {
703                                    // Get the parent hash for the next iteration
704                                    current_hash = block.header().parent_hash();
705                                    blocks.push(block);
706                                } else {
707                                    // Break the loop if we can't find the current block
708                                    break;
709                                }
710                            }
711
712                            let _ = response_tx.send(blocks);
713                        }
714                        CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
715                            let result =
716                                this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
717                                    let block = this.full_block_cache.get(block_hash).cloned()?;
718                                    let receipts = this.receipts_cache.get(block_hash).cloned();
719                                    Some(CachedTransaction::new(block, *idx, receipts))
720                                });
721                            let _ = response_tx.send(result);
722                        }
723                    };
724                    this.update_cached_metrics();
725                }
726            }
727        }
728    }
729}
730
731/// All message variants sent through the channel
732enum CacheAction<B: Block, R> {
733    GetBlockWithSenders {
734        block_hash: B256,
735        response_tx: BlockWithSendersResponseSender<B>,
736    },
737    GetHeader {
738        block_hash: B256,
739        response_tx: HeaderResponseSender<B::Header>,
740    },
741    GetReceipts {
742        block_hash: B256,
743        response_tx: ReceiptsResponseSender<R>,
744    },
745    GetBal {
746        block_hash: B256,
747        response_tx: BalResponseSender,
748    },
749    GetCachedBlock {
750        block_hash: B256,
751        response_tx: CachedBlockResponseSender<B>,
752    },
753    GetCachedBlockAndReceipts {
754        block_hash: B256,
755        response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
756    },
757    BlockWithSendersResult {
758        block_hash: B256,
759        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
760    },
761    ReceiptsResult {
762        block_hash: B256,
763        res: ProviderResult<Option<Arc<Vec<R>>>>,
764    },
765    HeaderResult {
766        block_hash: B256,
767        res: Box<ProviderResult<B::Header>>,
768    },
769    BalResult {
770        block_hash: B256,
771        res: ProviderResult<Option<CachedRevmBal>>,
772    },
773    CacheNewCanonicalChain {
774        chain_change: ChainChange<B, R>,
775    },
776    RemoveReorgedChain {
777        chain_change: ChainChange<B, R>,
778    },
779    GetCachedParentBlocks {
780        block_hash: B256,
781        max_blocks: usize,
782        response_tx: CachedParentBlocksResponseSender<B>,
783    },
784    /// Look up a transaction's cached data by its hash
785    GetTransactionByHash {
786        tx_hash: TxHash,
787        response_tx: TransactionHashResponseSender<B, R>,
788    },
789}
790
791struct BlockReceipts<R> {
792    block_hash: B256,
793    receipts: Vec<R>,
794}
795
796/// A change of the canonical chain
797struct ChainChange<B: Block, R> {
798    blocks: Vec<RecoveredBlock<B>>,
799    receipts: Vec<BlockReceipts<R>>,
800}
801
802impl<B: Block, R: Clone> ChainChange<B, R> {
803    fn new<N>(chain: Arc<Chain<N>>) -> Self
804    where
805        N: NodePrimitives<Block = B, Receipt = R>,
806    {
807        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
808            .blocks_and_receipts()
809            .map(|(block, receipts)| {
810                let block_receipts =
811                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
812                (block.clone(), block_receipts)
813            })
814            .unzip();
815        Self { blocks, receipts }
816    }
817}
818
819/// Identifier for the caches.
820#[derive(Copy, Clone, Debug)]
821enum CacheKind {
822    Block,
823    Receipt,
824    Header,
825    Bal,
826}
827
828/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
829/// before a result could be sent.
830///
831/// This type wraps a sender and in case the sender is still present on drop emit an error response.
832#[derive(Debug)]
833struct ActionSender<B: Block, R: Send + Sync> {
834    kind: CacheKind,
835    blockhash: B256,
836    tx: Option<UnboundedSender<CacheAction<B, R>>>,
837}
838
839impl<R: Send + Sync, B: Block> ActionSender<B, R> {
840    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
841        Self { kind, blockhash, tx: Some(tx) }
842    }
843
844    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
845        if let Some(tx) = self.tx.take() {
846            let _ = tx.send(CacheAction::BlockWithSendersResult {
847                block_hash: self.blockhash,
848                res: block_sender,
849            });
850        }
851    }
852
853    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
854        if let Some(tx) = self.tx.take() {
855            let _ =
856                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
857        }
858    }
859
860    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
861        if let Some(tx) = self.tx.take() {
862            let _ = tx.send(CacheAction::HeaderResult {
863                block_hash: self.blockhash,
864                res: Box::new(header),
865            });
866        }
867    }
868
869    fn send_bal(&mut self, bal: Result<Option<CachedRevmBal>, ProviderError>) {
870        if let Some(tx) = self.tx.take() {
871            let _ = tx.send(CacheAction::BalResult { block_hash: self.blockhash, res: bal });
872        }
873    }
874}
875impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
876    fn drop(&mut self) {
877        if let Some(tx) = self.tx.take() {
878            let msg = match self.kind {
879                CacheKind::Block => CacheAction::BlockWithSendersResult {
880                    block_hash: self.blockhash,
881                    res: Err(CacheServiceUnavailable.into()),
882                },
883                CacheKind::Receipt => CacheAction::ReceiptsResult {
884                    block_hash: self.blockhash,
885                    res: Err(CacheServiceUnavailable.into()),
886                },
887                CacheKind::Header => CacheAction::HeaderResult {
888                    block_hash: self.blockhash,
889                    res: Box::new(Err(CacheServiceUnavailable.into())),
890                },
891                CacheKind::Bal => CacheAction::BalResult {
892                    block_hash: self.blockhash,
893                    res: Err(CacheServiceUnavailable.into()),
894                },
895            };
896            let _ = tx.send(msg);
897        }
898    }
899}
900
901/// Awaits for new chain events and directly inserts them into the cache so they're available
902/// immediately before they need to be fetched from disk.
903///
904/// Reorged blocks are removed from the cache.
905pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
906    eth_state_cache: EthStateCache<N>,
907    mut events: St,
908) where
909    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
910{
911    while let Some(event) = events.next().await {
912        if let Some(reverted) = event.reverted() {
913            let chain_change = ChainChange::new(reverted);
914
915            let _ =
916                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
917        }
918
919        let chain_change = ChainChange::new(event.committed());
920
921        let _ =
922            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
923    }
924}
925
926/// Cached decoded revm BAL.
927#[derive(Clone, Debug)]
928pub(crate) struct CachedRevmBal(Arc<DecodedBal<Arc<RevmBal>>>);
929
930impl CachedRevmBal {
931    /// Creates a cached revm BAL from an owned decoded BAL.
932    #[inline]
933    fn new(bal: DecodedBal<Arc<RevmBal>>) -> Self {
934        Self(Arc::new(bal))
935    }
936}
937
938impl InMemorySize for CachedRevmBal {
939    fn size(&self) -> usize {
940        core::mem::size_of::<Self>() + decoded_revm_bal_size(&self.0)
941    }
942}
943
944fn decoded_revm_bal_size(bal: &DecodedBal<Arc<RevmBal>>) -> usize {
945    core::mem::size_of::<DecodedBal<Arc<RevmBal>>>() +
946        bal.as_raw().len() +
947        revm_bal_size(bal.as_bal())
948}
949
950fn revm_bal_size(bal: &Arc<RevmBal>) -> usize {
951    core::mem::size_of::<RevmBal>() +
952        bal.accounts.capacity() * core::mem::size_of::<(Address, RevmAccountBal)>() +
953        bal.accounts.values().map(revm_account_bal_heap_size).sum::<usize>()
954}
955
956fn revm_account_bal_heap_size(account: &RevmAccountBal) -> usize {
957    revm_account_info_bal_heap_size(&account.account_info) +
958        revm_storage_bal_heap_size(&account.storage)
959}
960
961fn revm_account_info_bal_heap_size(account_info: &RevmAccountInfoBal) -> usize {
962    revm_bal_writes_heap_size(&account_info.nonce, |_| 0) +
963        revm_bal_writes_heap_size(&account_info.balance, |_| 0) +
964        revm_bal_writes_heap_size(&account_info.code, revm_code_write_heap_size)
965}
966
967fn revm_storage_bal_heap_size(storage: &RevmStorageBal) -> usize {
968    storage.storage.len() * core::mem::size_of::<(StorageKey, RevmBalWrites<StorageValue>)>() +
969        storage
970            .storage
971            .values()
972            .map(|writes| revm_bal_writes_heap_size(writes, |_| 0))
973            .sum::<usize>()
974}
975
976fn revm_bal_writes_heap_size<T, F>(writes: &RevmBalWrites<T>, mut item_heap_size: F) -> usize
977where
978    T: PartialEq + Clone,
979    F: FnMut(&T) -> usize,
980{
981    writes.writes.capacity() * core::mem::size_of::<(u64, T)>() +
982        writes.writes.iter().map(|(_, item)| item_heap_size(item)).sum::<usize>()
983}
984
985fn revm_code_write_heap_size((_, bytecode): &(B256, Bytecode)) -> usize {
986    bytecode.bytes_ref().len()
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992    use alloy_consensus::{transaction::TransactionMeta, Header};
993    use alloy_eip7928::BlockAccessIndex;
994    use alloy_eips::{BlockHashOrNumber, NumHash};
995    use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes, Signature, TxHash, TxNumber};
996    use core::ops::{RangeBounds, RangeInclusive};
997    use reth_db_models::StoredBlockBodyIndices;
998    use reth_ethereum_primitives::{
999        Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned,
1000    };
1001    use reth_primitives_traits::{RecoveredBlock, SealedHeader};
1002    use reth_storage_api::{
1003        noop::NoopProvider, BalProvider, BalStore, BalStoreHandle, BlockBodyIndicesProvider,
1004        BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider, ReceiptProvider,
1005        TransactionVariant, TransactionsProvider,
1006    };
1007    use std::sync::atomic::{AtomicUsize, Ordering};
1008
1009    fn test_service() -> EthStateCacheService<NoopProvider, Runtime> {
1010        let (_cache, service) = EthStateCache::<EthPrimitives>::create(
1011            NoopProvider::default(),
1012            Runtime::test(),
1013            EthStateCacheConfig {
1014                max_blocks: 4,
1015                max_receipts: 4,
1016                max_headers: 4,
1017                max_bals: 4,
1018                max_concurrent_db_requests: 1,
1019                max_cached_tx_hashes: 16,
1020            },
1021        );
1022        service
1023    }
1024
1025    fn test_decoded_revm_bal() -> DecodedBal<Arc<RevmBal>> {
1026        DecodedBal::new(Arc::new(RevmBal::default()), Bytes::from_static(&[0xc0]))
1027    }
1028
1029    fn test_block() -> RecoveredBlock<Block> {
1030        RecoveredBlock::new_unhashed(
1031            Block {
1032                header: Header { number: 1, ..Default::default() },
1033                body: BlockBody {
1034                    transactions: vec![TransactionSigned::new_unhashed(
1035                        Transaction::Legacy(Default::default()),
1036                        Signature::test_signature(),
1037                    )],
1038                    ..Default::default()
1039                },
1040            },
1041            vec![Address::ZERO],
1042        )
1043    }
1044
1045    #[test]
1046    fn reorg_evicts_cached_headers() {
1047        let mut service = test_service();
1048        let block_hash = B256::repeat_byte(0x11);
1049
1050        assert!(service
1051            .headers_cache
1052            .insert(block_hash, Header { number: 42, ..Default::default() }));
1053        assert!(service.headers_cache.get(&block_hash).is_some());
1054
1055        service.on_reorg_header(block_hash, Ok(Header { number: 7, ..Default::default() }));
1056
1057        assert!(service.headers_cache.get(&block_hash).is_none());
1058    }
1059
1060    #[test]
1061    fn reorg_forwards_header_to_queued_requests() {
1062        let mut service = test_service();
1063        let block_hash = B256::repeat_byte(0x22);
1064        let (response_tx, mut response_rx) = oneshot::channel();
1065        let header = Header { number: 7, ..Default::default() };
1066
1067        assert!(service.headers_cache.queue(block_hash, response_tx));
1068
1069        service.on_reorg_header(block_hash, Ok(header));
1070
1071        let header =
1072            response_rx.try_recv().expect("queued header response").expect("header result");
1073
1074        assert_eq!(header.number, 7);
1075    }
1076
1077    #[test]
1078    fn reorg_removes_tx_hash_index_entries_unconditionally() {
1079        let mut service = test_service();
1080        let block = test_block();
1081        let tx_hash = *block.body().transactions().next().expect("test transaction").tx_hash();
1082
1083        service.tx_hash_index.insert(tx_hash, (B256::repeat_byte(0x33), 0));
1084
1085        service.remove_block_transactions(&block);
1086
1087        assert!(service.tx_hash_index.get(&tx_hash).is_none());
1088    }
1089
1090    #[test]
1091    fn reorg_evicts_cached_bal() {
1092        let mut service = test_service();
1093        let block_hash = B256::repeat_byte(0x44);
1094
1095        assert!(service.bal_cache.insert(block_hash, CachedRevmBal::new(test_decoded_revm_bal())));
1096        assert!(service.bal_cache.get(&block_hash).is_some());
1097
1098        service.on_reorg_bal(block_hash, Ok(None));
1099
1100        assert!(service.bal_cache.get(&block_hash).is_none());
1101    }
1102
1103    #[test]
1104    fn reorg_forwards_bal_to_queued_requests() {
1105        let mut service = test_service();
1106        let block_hash = B256::repeat_byte(0x55);
1107        let (response_tx, mut response_rx) = oneshot::channel();
1108        let bal = CachedRevmBal::new(test_decoded_revm_bal());
1109
1110        assert!(service.bal_cache.queue(block_hash, response_tx));
1111
1112        service.on_reorg_bal(block_hash, Ok(Some(bal)));
1113
1114        let bal = response_rx.try_recv().expect("queued BAL response").expect("BAL result");
1115
1116        assert!(bal.is_some());
1117    }
1118
1119    #[test]
1120    fn cached_revm_bal_size_accounts_for_nested_allocations() {
1121        let mut account = RevmAccountBal::default();
1122        account.account_info.nonce.writes.push((BlockAccessIndex::new(1), 1));
1123        account
1124            .account_info
1125            .balance
1126            .writes
1127            .push((BlockAccessIndex::new(2), StorageValue::from(1u64)));
1128        account.account_info.code.writes.push((
1129            BlockAccessIndex::new(3),
1130            (B256::repeat_byte(0xaa), Bytecode::new_raw(Bytes::from_static(&[0x60, 0x00]))),
1131        ));
1132        account.storage.storage.insert(
1133            StorageKey::from(1u64),
1134            RevmBalWrites::new(vec![(BlockAccessIndex::new(4), StorageValue::from(2u64))]),
1135        );
1136
1137        let mut bal = RevmBal::default();
1138        bal.accounts.insert(Address::ZERO, account);
1139
1140        let raw = Bytes::from_static(&[0xc0, 0x01, 0x02]);
1141        let previous_estimate = core::mem::size_of::<CachedRevmBal>() +
1142            core::mem::size_of::<DecodedBal<Arc<RevmBal>>>() +
1143            raw.len() +
1144            core::mem::size_of::<RevmBal>();
1145        assert!(CachedRevmBal::new(DecodedBal::new(Arc::new(bal), raw)).size() > previous_estimate);
1146    }
1147
1148    #[tokio::test]
1149    async fn get_bal_uses_cached_revm_bal() {
1150        let fetches = Arc::new(AtomicUsize::default());
1151        let provider = TestBalProvider::new(fetches.clone());
1152        let cache = EthStateCache::<EthPrimitives>::spawn_with(
1153            provider,
1154            EthStateCacheConfig {
1155                max_blocks: 0,
1156                max_receipts: 0,
1157                max_headers: 0,
1158                max_bals: 4,
1159                max_concurrent_db_requests: 1,
1160                max_cached_tx_hashes: 0,
1161            },
1162            Runtime::test(),
1163        );
1164        let block_hash = B256::repeat_byte(0x66);
1165
1166        assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1167        assert!(cache.get_bal(block_hash).await.unwrap().is_some());
1168
1169        assert_eq!(fetches.load(Ordering::SeqCst), 1);
1170    }
1171
1172    #[tokio::test]
1173    async fn concurrent_get_bal_requests_share_fetch() {
1174        let fetches = Arc::new(AtomicUsize::default());
1175        let provider = TestBalProvider::new(fetches.clone());
1176        let cache = EthStateCache::<EthPrimitives>::spawn_with(
1177            provider,
1178            EthStateCacheConfig {
1179                max_blocks: 0,
1180                max_receipts: 0,
1181                max_headers: 0,
1182                max_bals: 4,
1183                max_concurrent_db_requests: 1,
1184                max_cached_tx_hashes: 0,
1185            },
1186            Runtime::test(),
1187        );
1188        let block_hash = B256::repeat_byte(0x77);
1189
1190        let (first, second) = tokio::join!(cache.get_bal(block_hash), cache.get_bal(block_hash));
1191
1192        assert!(first.unwrap().is_some());
1193        assert!(second.unwrap().is_some());
1194        assert_eq!(fetches.load(Ordering::SeqCst), 1);
1195    }
1196
1197    #[derive(Clone, Debug, Default)]
1198    struct TestBalProvider {
1199        bal_store: BalStoreHandle,
1200    }
1201
1202    impl TestBalProvider {
1203        fn new(fetches: Arc<AtomicUsize>) -> Self {
1204            Self { bal_store: BalStoreHandle::new(TestBalStore { fetches }) }
1205        }
1206    }
1207
1208    impl BalProvider for TestBalProvider {
1209        fn bal_store(&self) -> &BalStoreHandle {
1210            &self.bal_store
1211        }
1212    }
1213
1214    #[derive(Debug)]
1215    struct TestBalStore {
1216        fetches: Arc<AtomicUsize>,
1217    }
1218
1219    impl BalStore for TestBalStore {
1220        fn insert(&self, _num_hash: NumHash, _bal: reth_storage_api::RawBal) -> ProviderResult<()> {
1221            Ok(())
1222        }
1223
1224        fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
1225            Ok(0)
1226        }
1227
1228        fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
1229            Ok(block_hashes.iter().map(|_| None).collect())
1230        }
1231
1232        fn revm_bal_by_hash(
1233            &self,
1234            _block_hash: BlockHash,
1235        ) -> ProviderResult<Option<DecodedBal<Arc<RevmBal>>>> {
1236            self.fetches.fetch_add(1, Ordering::SeqCst);
1237            Ok(Some(test_decoded_revm_bal()))
1238        }
1239
1240        fn bal_stream(&self) -> reth_storage_api::BalNotificationStream {
1241            reth_storage_api::NoopBalStore.bal_stream()
1242        }
1243    }
1244
1245    impl BlockHashReader for TestBalProvider {
1246        fn block_hash(&self, _number: BlockNumber) -> ProviderResult<Option<B256>> {
1247            Ok(None)
1248        }
1249
1250        fn canonical_hashes_range(
1251            &self,
1252            _start: BlockNumber,
1253            _end: BlockNumber,
1254        ) -> ProviderResult<Vec<B256>> {
1255            Ok(Vec::new())
1256        }
1257    }
1258
1259    impl BlockNumReader for TestBalProvider {
1260        fn chain_info(&self) -> ProviderResult<reth_chainspec::ChainInfo> {
1261            Ok(reth_chainspec::ChainInfo::default())
1262        }
1263
1264        fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1265            Ok(0)
1266        }
1267
1268        fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1269            Ok(0)
1270        }
1271
1272        fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1273            Ok(None)
1274        }
1275    }
1276
1277    impl HeaderProvider for TestBalProvider {
1278        type Header = Header;
1279
1280        fn header(&self, _block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1281            Ok(None)
1282        }
1283
1284        fn header_by_number(&self, _num: u64) -> ProviderResult<Option<Self::Header>> {
1285            Ok(None)
1286        }
1287
1288        fn headers_range(
1289            &self,
1290            _range: impl RangeBounds<BlockNumber>,
1291        ) -> ProviderResult<Vec<Self::Header>> {
1292            Ok(Vec::new())
1293        }
1294
1295        fn sealed_header(
1296            &self,
1297            _number: BlockNumber,
1298        ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1299            Ok(None)
1300        }
1301
1302        fn sealed_headers_while(
1303            &self,
1304            _range: impl RangeBounds<BlockNumber>,
1305            _predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1306        ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1307            Ok(Vec::new())
1308        }
1309    }
1310
1311    impl BlockBodyIndicesProvider for TestBalProvider {
1312        fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1313            Ok(None)
1314        }
1315
1316        fn block_body_indices_range(
1317            &self,
1318            _range: RangeInclusive<BlockNumber>,
1319        ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1320            Ok(Vec::new())
1321        }
1322    }
1323
1324    impl TransactionsProvider for TestBalProvider {
1325        type Transaction = TransactionSigned;
1326
1327        fn transaction_id(&self, _tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1328            Ok(None)
1329        }
1330
1331        fn transaction_by_id(&self, _id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1332            Ok(None)
1333        }
1334
1335        fn transaction_by_id_unhashed(
1336            &self,
1337            _id: TxNumber,
1338        ) -> ProviderResult<Option<Self::Transaction>> {
1339            Ok(None)
1340        }
1341
1342        fn transaction_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1343            Ok(None)
1344        }
1345
1346        fn transaction_by_hash_with_meta(
1347            &self,
1348            _hash: TxHash,
1349        ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1350            Ok(None)
1351        }
1352
1353        fn transactions_by_block(
1354            &self,
1355            _block: BlockHashOrNumber,
1356        ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1357            Ok(None)
1358        }
1359
1360        fn transactions_by_block_range(
1361            &self,
1362            _range: impl RangeBounds<BlockNumber>,
1363        ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1364            Ok(Vec::new())
1365        }
1366
1367        fn transactions_by_tx_range(
1368            &self,
1369            _range: impl RangeBounds<TxNumber>,
1370        ) -> ProviderResult<Vec<Self::Transaction>> {
1371            Ok(Vec::new())
1372        }
1373
1374        fn senders_by_tx_range(
1375            &self,
1376            _range: impl RangeBounds<TxNumber>,
1377        ) -> ProviderResult<Vec<Address>> {
1378            Ok(Vec::new())
1379        }
1380
1381        fn transaction_sender(&self, _id: TxNumber) -> ProviderResult<Option<Address>> {
1382            Ok(None)
1383        }
1384    }
1385
1386    impl ReceiptProvider for TestBalProvider {
1387        type Receipt = Receipt;
1388
1389        fn receipt(&self, _id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1390            Ok(None)
1391        }
1392
1393        fn receipt_by_hash(&self, _hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1394            Ok(None)
1395        }
1396
1397        fn receipts_by_block(
1398            &self,
1399            _block: BlockHashOrNumber,
1400        ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1401            Ok(None)
1402        }
1403
1404        fn receipts_by_tx_range(
1405            &self,
1406            _range: impl RangeBounds<TxNumber>,
1407        ) -> ProviderResult<Vec<Self::Receipt>> {
1408            Ok(Vec::new())
1409        }
1410
1411        fn receipts_by_block_range(
1412            &self,
1413            _block_range: RangeInclusive<BlockNumber>,
1414        ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1415            Ok(Vec::new())
1416        }
1417    }
1418
1419    impl BlockReader for TestBalProvider {
1420        type Block = Block;
1421
1422        fn find_block_by_hash(
1423            &self,
1424            _hash: B256,
1425            _source: BlockSource,
1426        ) -> ProviderResult<Option<Self::Block>> {
1427            Ok(None)
1428        }
1429
1430        fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1431            Ok(None)
1432        }
1433
1434        fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1435            Ok(None)
1436        }
1437
1438        fn pending_block_and_receipts(
1439            &self,
1440        ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1441            Ok(None)
1442        }
1443
1444        fn recovered_block(
1445            &self,
1446            _id: BlockHashOrNumber,
1447            _transaction_kind: TransactionVariant,
1448        ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1449            Ok(None)
1450        }
1451
1452        fn sealed_block_with_senders(
1453            &self,
1454            _id: BlockHashOrNumber,
1455            _transaction_kind: TransactionVariant,
1456        ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1457            Ok(None)
1458        }
1459
1460        fn block_range(
1461            &self,
1462            _range: RangeInclusive<BlockNumber>,
1463        ) -> ProviderResult<Vec<Self::Block>> {
1464            Ok(Vec::new())
1465        }
1466
1467        fn block_with_senders_range(
1468            &self,
1469            _range: RangeInclusive<BlockNumber>,
1470        ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1471            Ok(Vec::new())
1472        }
1473
1474        fn recovered_block_range(
1475            &self,
1476            _range: RangeInclusive<BlockNumber>,
1477        ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1478            Ok(Vec::new())
1479        }
1480
1481        fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1482            Ok(None)
1483        }
1484    }
1485}