Skip to main content

reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{TxHash, B256};
8use futures::{stream::FuturesOrdered, Stream, StreamExt};
9use reth_chain_state::CanonStateNotification;
10use reth_errors::{ProviderError, ProviderResult};
11use reth_execution_types::Chain;
12use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
13use reth_storage_api::{BlockReader, TransactionVariant};
14use reth_tasks::{TaskSpawner, TokioTaskExecutor};
15use schnellru::{ByLength, Limiter, LruMap};
16use std::{
17    future::Future,
18    pin::Pin,
19    sync::Arc,
20    task::{Context, Poll},
21};
22use tokio::sync::{
23    mpsc::{unbounded_channel, UnboundedSender},
24    oneshot, Semaphore,
25};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28pub mod config;
29pub mod db;
30pub mod metrics;
31pub mod multi_consumer;
32
33/// The type that can send the response to a requested [`RecoveredBlock`]
34type BlockWithSendersResponseSender<B> =
35    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
36
37/// The type that can send the response to the requested receipts of a block.
38type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
39
40type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
41
42type CachedBlockAndReceiptsResponseSender<B, R> =
43    oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
44
45/// The type that can send the response to a requested header
46type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
47
48/// The type that can send the response with a chain of cached blocks
49type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
50
51/// The type that can send the response for a transaction hash lookup
52type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
53
54type BlockLruCache<B, L> =
55    MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
56
57type ReceiptsLruCache<R, L> =
58    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
59
60type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
61
62/// Provides async access to cached eth data
63///
64/// This is the frontend for the async caching service which manages cached data on a different
65/// task.
66#[derive(Debug)]
67pub struct EthStateCache<N: NodePrimitives> {
68    to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
69}
70
71impl<N: NodePrimitives> Clone for EthStateCache<N> {
72    fn clone(&self) -> Self {
73        Self { to_service: self.to_service.clone() }
74    }
75}
76
77impl<N: NodePrimitives> EthStateCache<N> {
78    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
79    fn create<Provider, Tasks>(
80        provider: Provider,
81        action_task_spawner: Tasks,
82        max_blocks: u32,
83        max_receipts: u32,
84        max_headers: u32,
85        max_concurrent_db_operations: usize,
86        max_cached_tx_hashes: u32,
87    ) -> (Self, EthStateCacheService<Provider, Tasks>)
88    where
89        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
90    {
91        let (to_service, rx) = unbounded_channel();
92
93        let service = EthStateCacheService {
94            provider,
95            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
96            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
97            headers_cache: HeaderLruCache::new(max_headers, "headers"),
98            action_tx: to_service.clone(),
99            action_rx: UnboundedReceiverStream::new(rx),
100            action_task_spawner,
101            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
102            tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
103        };
104        let cache = Self { to_service };
105        (cache, service)
106    }
107
108    /// Creates a new async LRU backed cache service task and spawns it to a new task via
109    /// [`tokio::spawn`].
110    ///
111    /// See also [`Self::spawn_with`]
112    pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
113    where
114        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
115    {
116        Self::spawn_with(provider, config, TokioTaskExecutor::default())
117    }
118
119    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
120    /// spawner.
121    ///
122    /// The cache is memory limited by the given max bytes values.
123    pub fn spawn_with<Provider, Tasks>(
124        provider: Provider,
125        config: EthStateCacheConfig,
126        executor: Tasks,
127    ) -> Self
128    where
129        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
130        Tasks: TaskSpawner + Clone + 'static,
131    {
132        let EthStateCacheConfig {
133            max_blocks,
134            max_receipts,
135            max_headers,
136            max_concurrent_db_requests,
137            max_cached_tx_hashes,
138        } = config;
139        let (this, service) = Self::create(
140            provider,
141            executor.clone(),
142            max_blocks,
143            max_receipts,
144            max_headers,
145            max_concurrent_db_requests,
146            max_cached_tx_hashes,
147        );
148        executor.spawn_critical_task("eth state cache", Box::pin(service));
149        this
150    }
151
152    /// Requests the  [`RecoveredBlock`] for the block hash
153    ///
154    /// Returns `None` if the block does not exist.
155    pub async fn get_recovered_block(
156        &self,
157        block_hash: B256,
158    ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
159        let (response_tx, rx) = oneshot::channel();
160        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
161        rx.await.map_err(|_| CacheServiceUnavailable)?
162    }
163
164    /// Requests the receipts for the block hash
165    ///
166    /// Returns `None` if the block was not found.
167    pub async fn get_receipts(
168        &self,
169        block_hash: B256,
170    ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
171        let (response_tx, rx) = oneshot::channel();
172        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
173        rx.await.map_err(|_| CacheServiceUnavailable)?
174    }
175
176    /// Fetches both receipts and block for the given block hash.
177    pub async fn get_block_and_receipts(
178        &self,
179        block_hash: B256,
180    ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
181        let block = self.get_recovered_block(block_hash);
182        let receipts = self.get_receipts(block_hash);
183
184        let (block, receipts) = futures::try_join!(block, receipts)?;
185
186        Ok(block.zip(receipts))
187    }
188
189    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
190    pub async fn get_receipts_and_maybe_block(
191        &self,
192        block_hash: B256,
193    ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
194        let (response_tx, rx) = oneshot::channel();
195        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
196
197        let receipts = self.get_receipts(block_hash);
198
199        let (receipts, block) = futures::join!(receipts, rx);
200
201        let block = block.map_err(|_| CacheServiceUnavailable)?;
202        Ok(receipts?.map(|r| (r, block)))
203    }
204
205    /// Retrieves both block and receipts from cache if available.
206    pub async fn maybe_cached_block_and_receipts(
207        &self,
208        block_hash: B256,
209    ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
210        let (response_tx, rx) = oneshot::channel();
211        let _ = self
212            .to_service
213            .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
214        rx.await.map_err(|_| CacheServiceUnavailable.into())
215    }
216
217    /// Streams cached receipts and blocks for a list of block hashes, preserving input order.
218    #[allow(clippy::type_complexity)]
219    pub fn get_receipts_and_maybe_block_stream<'a>(
220        &'a self,
221        hashes: Vec<B256>,
222    ) -> impl Stream<
223        Item = ProviderResult<
224            Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
225        >,
226    > + 'a {
227        let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
228
229        futures.collect::<FuturesOrdered<_>>()
230    }
231
232    /// Requests the header for the given hash.
233    ///
234    /// Returns an error if the header is not found.
235    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
236        let (response_tx, rx) = oneshot::channel();
237        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
238        rx.await.map_err(|_| CacheServiceUnavailable)?
239    }
240
241    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
242    /// and traversing down through parent hashes. Returns blocks in descending order (newest
243    /// first).
244    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
245    /// cache without making separate database requests.
246    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
247    /// with at least one block.
248    pub async fn get_cached_parent_blocks(
249        &self,
250        block_hash: B256,
251        max_blocks: usize,
252    ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
253        let (response_tx, rx) = oneshot::channel();
254        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
255            block_hash,
256            max_blocks,
257            response_tx,
258        });
259
260        let blocks = rx.await.unwrap_or_default();
261        if blocks.is_empty() {
262            None
263        } else {
264            Some(blocks)
265        }
266    }
267
268    /// Looks up a transaction by its hash in the cache index.
269    ///
270    /// Returns the cached block, transaction index, and optionally receipts if the transaction
271    /// is in a cached block.
272    pub async fn get_transaction_by_hash(
273        &self,
274        tx_hash: TxHash,
275    ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
276        let (response_tx, rx) = oneshot::channel();
277        let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
278        rx.await.ok()?
279    }
280}
281/// Thrown when the cache service task dropped.
282#[derive(Debug, thiserror::Error)]
283#[error("cache service task stopped")]
284pub struct CacheServiceUnavailable;
285
286impl From<CacheServiceUnavailable> for ProviderError {
287    fn from(err: CacheServiceUnavailable) -> Self {
288        Self::other(err)
289    }
290}
291
292/// A task that manages caches for data required by the `eth` rpc implementation.
293///
294/// It provides a caching layer on top of the given
295/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
296/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
297/// into the cache afterwards. While fetching data from disk is sync, this service is async since
298/// requests and data is shared via channels.
299///
300/// This type is an endless future that listens for incoming messages from the user facing
301/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
302/// that does the IO and sends the result back to it. This way the caching service only
303/// handles messages and does LRU lookups and never blocking IO.
304///
305/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
306/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
307/// permits to limit concurrent requests.
308#[must_use = "Type does nothing unless spawned"]
309pub(crate) struct EthStateCacheService<
310    Provider,
311    Tasks,
312    LimitBlocks = ByLength,
313    LimitReceipts = ByLength,
314    LimitHeaders = ByLength,
315> where
316    Provider: BlockReader,
317    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
318    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
319    LimitHeaders: Limiter<B256, Provider::Header>,
320{
321    /// The type used to lookup data from disk
322    provider: Provider,
323    /// The LRU cache for full blocks grouped by their block hash.
324    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
325    /// The LRU cache for block receipts grouped by the block hash.
326    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
327    /// The LRU cache for headers.
328    ///
329    /// Headers are cached because they are required to populate the environment for execution
330    /// (evm).
331    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
332    /// Sender half of the action channel.
333    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
334    /// Receiver half of the action channel.
335    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
336    /// The type that's used to spawn tasks that do the actual work
337    action_task_spawner: Tasks,
338    /// Rate limiter for spawned fetch tasks.
339    ///
340    /// This restricts the max concurrent fetch tasks at the same time.
341    rate_limiter: Arc<Semaphore>,
342    /// LRU index mapping transaction hashes to their block hash and index within the block.
343    tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
344}
345
346impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
347where
348    Provider: BlockReader + Clone + Unpin + 'static,
349    Tasks: TaskSpawner + Clone + 'static,
350{
351    /// Indexes all transactions in a block by transaction hash.
352    fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
353        let block_hash = block.hash();
354        for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
355            self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
356        }
357    }
358
359    /// Removes transaction index entries for a reorged block.
360    ///
361    /// Only removes entries that still point to this block, preserving mappings for transactions
362    /// that were re-mined in a new canonical block.
363    fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
364        let block_hash = block.hash();
365        for tx in block.body().transactions() {
366            if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
367                *mapped_hash == block_hash
368            {
369                self.tx_hash_index.remove(tx.tx_hash());
370            }
371        }
372    }
373
374    fn on_new_block(
375        &mut self,
376        block_hash: B256,
377        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
378    ) {
379        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
380            // send the response to queued senders
381            for tx in queued {
382                let _ = tx.send(res.clone());
383            }
384        }
385
386        // cache good block
387        if let Ok(Some(block)) = res {
388            self.full_block_cache.insert(block_hash, block);
389        }
390    }
391
392    fn on_new_receipts(
393        &mut self,
394        block_hash: B256,
395        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
396    ) {
397        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
398            // send the response to queued senders
399            for tx in queued {
400                let _ = tx.send(res.clone());
401            }
402        }
403
404        // cache good receipts
405        if let Ok(Some(receipts)) = res {
406            self.receipts_cache.insert(block_hash, receipts);
407        }
408    }
409
410    fn on_reorg_block(
411        &mut self,
412        block_hash: B256,
413        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
414    ) {
415        let res = res.map(|b| b.map(Arc::new));
416        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
417            // send the response to queued senders
418            for tx in queued {
419                let _ = tx.send(res.clone());
420            }
421        }
422    }
423
424    fn on_reorg_receipts(
425        &mut self,
426        block_hash: B256,
427        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
428    ) {
429        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
430            // send the response to queued senders
431            for tx in queued {
432                let _ = tx.send(res.clone());
433            }
434        }
435    }
436
437    /// Shrinks the queues but leaves some space for the next requests
438    fn shrink_queues(&mut self) {
439        let min_capacity = 2;
440        self.full_block_cache.shrink_to(min_capacity);
441        self.receipts_cache.shrink_to(min_capacity);
442        self.headers_cache.shrink_to(min_capacity);
443    }
444
445    fn update_cached_metrics(&self) {
446        self.full_block_cache.update_cached_metrics();
447        self.receipts_cache.update_cached_metrics();
448        self.headers_cache.update_cached_metrics();
449    }
450}
451
452impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
453where
454    Provider: BlockReader + Clone + Unpin + 'static,
455    Tasks: TaskSpawner + Clone + 'static,
456{
457    type Output = ();
458
459    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
460        let this = self.get_mut();
461
462        loop {
463            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
464                // shrink queues if we don't have any work to do
465                this.shrink_queues();
466                return Poll::Pending;
467            };
468
469            match action {
470                None => {
471                    unreachable!("can't close")
472                }
473                Some(action) => {
474                    match action {
475                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
476                            let _ =
477                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
478                        }
479                        CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
480                            let block = this.full_block_cache.get(&block_hash).cloned();
481                            let receipts = this.receipts_cache.get(&block_hash).cloned();
482                            let _ = response_tx.send((block, receipts));
483                        }
484                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
485                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
486                                let _ = response_tx.send(Ok(Some(block)));
487                                continue
488                            }
489
490                            // block is not in the cache, request it if this is the first consumer
491                            if this.full_block_cache.queue(block_hash, response_tx) {
492                                let provider = this.provider.clone();
493                                let action_tx = this.action_tx.clone();
494                                let rate_limiter = this.rate_limiter.clone();
495                                let mut action_sender =
496                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
497                                this.action_task_spawner.spawn_blocking_task(Box::pin(
498                                    async move {
499                                        // Acquire permit
500                                        let _permit = rate_limiter.acquire().await;
501                                        // Only look in the database to prevent situations where we
502                                        // looking up the tree is blocking
503                                        let block_sender = provider
504                                            .sealed_block_with_senders(
505                                                BlockHashOrNumber::Hash(block_hash),
506                                                TransactionVariant::WithHash,
507                                            )
508                                            .map(|maybe_block| maybe_block.map(Arc::new));
509                                        action_sender.send_block(block_sender);
510                                    },
511                                ));
512                            }
513                        }
514                        CacheAction::GetReceipts { block_hash, response_tx } => {
515                            // check if block is cached
516                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
517                                let _ = response_tx.send(Ok(Some(receipts)));
518                                continue
519                            }
520
521                            // block is not in the cache, request it if this is the first consumer
522                            if this.receipts_cache.queue(block_hash, response_tx) {
523                                let provider = this.provider.clone();
524                                let action_tx = this.action_tx.clone();
525                                let rate_limiter = this.rate_limiter.clone();
526                                let mut action_sender =
527                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
528                                this.action_task_spawner.spawn_blocking_task(Box::pin(
529                                    async move {
530                                        // Acquire permit
531                                        let _permit = rate_limiter.acquire().await;
532                                        let res = provider
533                                            .receipts_by_block(block_hash.into())
534                                            .map(|maybe_receipts| maybe_receipts.map(Arc::new));
535
536                                        action_sender.send_receipts(res);
537                                    },
538                                ));
539                            }
540                        }
541                        CacheAction::GetHeader { block_hash, response_tx } => {
542                            // check if the header is cached
543                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
544                                let _ = response_tx.send(Ok(header));
545                                continue
546                            }
547
548                            // it's possible we have the entire block cached
549                            if let Some(block) = this.full_block_cache.get(&block_hash) {
550                                let _ = response_tx.send(Ok(block.clone_header()));
551                                continue
552                            }
553
554                            // header is not in the cache, request it if this is the first
555                            // consumer
556                            if this.headers_cache.queue(block_hash, response_tx) {
557                                let provider = this.provider.clone();
558                                let action_tx = this.action_tx.clone();
559                                let rate_limiter = this.rate_limiter.clone();
560                                let mut action_sender =
561                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
562                                this.action_task_spawner.spawn_blocking_task(Box::pin(
563                                    async move {
564                                        // Acquire permit
565                                        let _permit = rate_limiter.acquire().await;
566                                        let header =
567                                            provider.header(block_hash).and_then(|header| {
568                                                header.ok_or_else(|| {
569                                                    ProviderError::HeaderNotFound(block_hash.into())
570                                                })
571                                            });
572                                        action_sender.send_header(header);
573                                    },
574                                ));
575                            }
576                        }
577                        CacheAction::ReceiptsResult { block_hash, res } => {
578                            this.on_new_receipts(block_hash, res);
579                        }
580                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
581                            Ok(Some(block_with_senders)) => {
582                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
583                            }
584                            Ok(None) => {
585                                this.on_new_block(block_hash, Ok(None));
586                            }
587                            Err(e) => {
588                                this.on_new_block(block_hash, Err(e));
589                            }
590                        },
591                        CacheAction::HeaderResult { block_hash, res } => {
592                            let res = *res;
593                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
594                                // send the response to queued senders
595                                for tx in queued {
596                                    let _ = tx.send(res.clone());
597                                }
598                            }
599
600                            // cache good header
601                            if let Ok(data) = res {
602                                this.headers_cache.insert(block_hash, data);
603                            }
604                        }
605                        CacheAction::CacheNewCanonicalChain { chain_change } => {
606                            for block in chain_change.blocks {
607                                // Index transactions before caching the block
608                                this.index_block_transactions(&block);
609                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
610                            }
611
612                            for block_receipts in chain_change.receipts {
613                                this.on_new_receipts(
614                                    block_receipts.block_hash,
615                                    Ok(Some(Arc::new(block_receipts.receipts))),
616                                );
617                            }
618                        }
619                        CacheAction::RemoveReorgedChain { chain_change } => {
620                            for block in chain_change.blocks {
621                                // Remove transaction index entries for reorged blocks
622                                this.remove_block_transactions(&block);
623                                this.on_reorg_block(block.hash(), Ok(Some(block)));
624                            }
625
626                            for block_receipts in chain_change.receipts {
627                                this.on_reorg_receipts(
628                                    block_receipts.block_hash,
629                                    Ok(Some(Arc::new(block_receipts.receipts))),
630                                );
631                            }
632                        }
633                        CacheAction::GetCachedParentBlocks {
634                            block_hash,
635                            max_blocks,
636                            response_tx,
637                        } => {
638                            let mut blocks = Vec::new();
639                            let mut current_hash = block_hash;
640
641                            // Start with the requested block
642                            while blocks.len() < max_blocks {
643                                if let Some(block) =
644                                    this.full_block_cache.get(&current_hash).cloned()
645                                {
646                                    // Get the parent hash for the next iteration
647                                    current_hash = block.header().parent_hash();
648                                    blocks.push(block);
649                                } else {
650                                    // Break the loop if we can't find the current block
651                                    break;
652                                }
653                            }
654
655                            let _ = response_tx.send(blocks);
656                        }
657                        CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
658                            let result =
659                                this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
660                                    let block = this.full_block_cache.get(block_hash).cloned()?;
661                                    let receipts = this.receipts_cache.get(block_hash).cloned();
662                                    Some(CachedTransaction::new(block, *idx, receipts))
663                                });
664                            let _ = response_tx.send(result);
665                        }
666                    };
667                    this.update_cached_metrics();
668                }
669            }
670        }
671    }
672}
673
674/// All message variants sent through the channel
675enum CacheAction<B: Block, R> {
676    GetBlockWithSenders {
677        block_hash: B256,
678        response_tx: BlockWithSendersResponseSender<B>,
679    },
680    GetHeader {
681        block_hash: B256,
682        response_tx: HeaderResponseSender<B::Header>,
683    },
684    GetReceipts {
685        block_hash: B256,
686        response_tx: ReceiptsResponseSender<R>,
687    },
688    GetCachedBlock {
689        block_hash: B256,
690        response_tx: CachedBlockResponseSender<B>,
691    },
692    GetCachedBlockAndReceipts {
693        block_hash: B256,
694        response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
695    },
696    BlockWithSendersResult {
697        block_hash: B256,
698        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
699    },
700    ReceiptsResult {
701        block_hash: B256,
702        res: ProviderResult<Option<Arc<Vec<R>>>>,
703    },
704    HeaderResult {
705        block_hash: B256,
706        res: Box<ProviderResult<B::Header>>,
707    },
708    CacheNewCanonicalChain {
709        chain_change: ChainChange<B, R>,
710    },
711    RemoveReorgedChain {
712        chain_change: ChainChange<B, R>,
713    },
714    GetCachedParentBlocks {
715        block_hash: B256,
716        max_blocks: usize,
717        response_tx: CachedParentBlocksResponseSender<B>,
718    },
719    /// Look up a transaction's cached data by its hash
720    GetTransactionByHash {
721        tx_hash: TxHash,
722        response_tx: TransactionHashResponseSender<B, R>,
723    },
724}
725
726struct BlockReceipts<R> {
727    block_hash: B256,
728    receipts: Vec<R>,
729}
730
731/// A change of the canonical chain
732struct ChainChange<B: Block, R> {
733    blocks: Vec<RecoveredBlock<B>>,
734    receipts: Vec<BlockReceipts<R>>,
735}
736
737impl<B: Block, R: Clone> ChainChange<B, R> {
738    fn new<N>(chain: Arc<Chain<N>>) -> Self
739    where
740        N: NodePrimitives<Block = B, Receipt = R>,
741    {
742        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
743            .blocks_and_receipts()
744            .map(|(block, receipts)| {
745                let block_receipts =
746                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
747                (block.clone(), block_receipts)
748            })
749            .unzip();
750        Self { blocks, receipts }
751    }
752}
753
754/// Identifier for the caches.
755#[derive(Copy, Clone, Debug)]
756enum CacheKind {
757    Block,
758    Receipt,
759    Header,
760}
761
762/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
763/// before a result could be sent.
764///
765/// This type wraps a sender and in case the sender is still present on drop emit an error response.
766#[derive(Debug)]
767struct ActionSender<B: Block, R: Send + Sync> {
768    kind: CacheKind,
769    blockhash: B256,
770    tx: Option<UnboundedSender<CacheAction<B, R>>>,
771}
772
773impl<R: Send + Sync, B: Block> ActionSender<B, R> {
774    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
775        Self { kind, blockhash, tx: Some(tx) }
776    }
777
778    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
779        if let Some(tx) = self.tx.take() {
780            let _ = tx.send(CacheAction::BlockWithSendersResult {
781                block_hash: self.blockhash,
782                res: block_sender,
783            });
784        }
785    }
786
787    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
788        if let Some(tx) = self.tx.take() {
789            let _ =
790                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
791        }
792    }
793
794    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
795        if let Some(tx) = self.tx.take() {
796            let _ = tx.send(CacheAction::HeaderResult {
797                block_hash: self.blockhash,
798                res: Box::new(header),
799            });
800        }
801    }
802}
803impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
804    fn drop(&mut self) {
805        if let Some(tx) = self.tx.take() {
806            let msg = match self.kind {
807                CacheKind::Block => CacheAction::BlockWithSendersResult {
808                    block_hash: self.blockhash,
809                    res: Err(CacheServiceUnavailable.into()),
810                },
811                CacheKind::Receipt => CacheAction::ReceiptsResult {
812                    block_hash: self.blockhash,
813                    res: Err(CacheServiceUnavailable.into()),
814                },
815                CacheKind::Header => CacheAction::HeaderResult {
816                    block_hash: self.blockhash,
817                    res: Box::new(Err(CacheServiceUnavailable.into())),
818                },
819            };
820            let _ = tx.send(msg);
821        }
822    }
823}
824
825/// Awaits for new chain events and directly inserts them into the cache so they're available
826/// immediately before they need to be fetched from disk.
827///
828/// Reorged blocks are removed from the cache.
829pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
830    eth_state_cache: EthStateCache<N>,
831    mut events: St,
832) where
833    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
834{
835    while let Some(event) = events.next().await {
836        if let Some(reverted) = event.reverted() {
837            let chain_change = ChainChange::new(reverted);
838
839            let _ =
840                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
841        }
842
843        let chain_change = ChainChange::new(event.committed());
844
845        let _ =
846            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
847    }
848}