Skip to main content

reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{TxHash, B256};
8use futures::{stream::FuturesOrdered, Stream, StreamExt};
9use reth_chain_state::CanonStateNotification;
10use reth_errors::{ProviderError, ProviderResult};
11use reth_execution_types::Chain;
12use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
13use reth_storage_api::{BlockReader, TransactionVariant};
14use reth_tasks::Runtime;
15use schnellru::{ByLength, Limiter, LruMap};
16use std::{
17    future::Future,
18    pin::Pin,
19    sync::Arc,
20    task::{Context, Poll},
21};
22use tokio::sync::{
23    mpsc::{unbounded_channel, UnboundedSender},
24    oneshot, Semaphore,
25};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28pub mod config;
29pub mod db;
30pub mod metrics;
31pub mod multi_consumer;
32
33/// The type that can send the response to a requested [`RecoveredBlock`]
34type BlockWithSendersResponseSender<B> =
35    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
36
37/// The type that can send the response to the requested receipts of a block.
38type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
39
40type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
41
42type CachedBlockAndReceiptsResponseSender<B, R> =
43    oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
44
45/// The type that can send the response to a requested header
46type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
47
48/// The type that can send the response with a chain of cached blocks
49type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
50
51/// The type that can send the response for a transaction hash lookup
52type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
53
54type BlockLruCache<B, L> =
55    MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
56
57type ReceiptsLruCache<R, L> =
58    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
59
60type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
61
62/// Provides async access to cached eth data
63///
64/// This is the frontend for the async caching service which manages cached data on a different
65/// task.
66#[derive(Debug)]
67pub struct EthStateCache<N: NodePrimitives> {
68    to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
69}
70
71impl<N: NodePrimitives> Clone for EthStateCache<N> {
72    fn clone(&self) -> Self {
73        Self { to_service: self.to_service.clone() }
74    }
75}
76
77impl<N: NodePrimitives> EthStateCache<N> {
78    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
79    fn create<Provider>(
80        provider: Provider,
81        action_task_spawner: Runtime,
82        max_blocks: u32,
83        max_receipts: u32,
84        max_headers: u32,
85        max_concurrent_db_operations: usize,
86        max_cached_tx_hashes: u32,
87    ) -> (Self, EthStateCacheService<Provider, Runtime>)
88    where
89        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
90    {
91        let (to_service, rx) = unbounded_channel();
92
93        let service = EthStateCacheService {
94            provider,
95            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
96            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
97            headers_cache: HeaderLruCache::new(max_headers, "headers"),
98            action_tx: to_service.clone(),
99            action_rx: UnboundedReceiverStream::new(rx),
100            action_task_spawner,
101            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
102            tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
103        };
104        let cache = Self { to_service };
105        (cache, service)
106    }
107
108    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
109    /// spawner.
110    ///
111    /// The cache is memory limited by the given max bytes values.
112    pub fn spawn_with<Provider>(
113        provider: Provider,
114        config: EthStateCacheConfig,
115        executor: Runtime,
116    ) -> Self
117    where
118        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
119    {
120        let EthStateCacheConfig {
121            max_blocks,
122            max_receipts,
123            max_headers,
124            max_concurrent_db_requests,
125            max_cached_tx_hashes,
126        } = config;
127        let (this, service) = Self::create(
128            provider,
129            executor.clone(),
130            max_blocks,
131            max_receipts,
132            max_headers,
133            max_concurrent_db_requests,
134            max_cached_tx_hashes,
135        );
136        executor.spawn_critical_task("eth state cache", service);
137        this
138    }
139
140    /// Requests the  [`RecoveredBlock`] for the block hash
141    ///
142    /// Returns `None` if the block does not exist.
143    pub async fn get_recovered_block(
144        &self,
145        block_hash: B256,
146    ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
147        let (response_tx, rx) = oneshot::channel();
148        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
149        rx.await.map_err(|_| CacheServiceUnavailable)?
150    }
151
152    /// Requests the receipts for the block hash
153    ///
154    /// Returns `None` if the block was not found.
155    pub async fn get_receipts(
156        &self,
157        block_hash: B256,
158    ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
159        let (response_tx, rx) = oneshot::channel();
160        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
161        rx.await.map_err(|_| CacheServiceUnavailable)?
162    }
163
164    /// Fetches both receipts and block for the given block hash.
165    pub async fn get_block_and_receipts(
166        &self,
167        block_hash: B256,
168    ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
169        let block = self.get_recovered_block(block_hash);
170        let receipts = self.get_receipts(block_hash);
171
172        let (block, receipts) = futures::try_join!(block, receipts)?;
173
174        Ok(block.zip(receipts))
175    }
176
177    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
178    pub async fn get_receipts_and_maybe_block(
179        &self,
180        block_hash: B256,
181    ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
182        let (response_tx, rx) = oneshot::channel();
183        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
184
185        let receipts = self.get_receipts(block_hash);
186
187        let (receipts, block) = futures::join!(receipts, rx);
188
189        let block = block.map_err(|_| CacheServiceUnavailable)?;
190        Ok(receipts?.map(|r| (r, block)))
191    }
192
193    /// Retrieves both block and receipts from cache if available.
194    pub async fn maybe_cached_block_and_receipts(
195        &self,
196        block_hash: B256,
197    ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
198        let (response_tx, rx) = oneshot::channel();
199        let _ = self
200            .to_service
201            .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
202        rx.await.map_err(|_| CacheServiceUnavailable.into())
203    }
204
205    /// Streams cached receipts and blocks for a list of block hashes, preserving input order.
206    #[allow(clippy::type_complexity)]
207    pub fn get_receipts_and_maybe_block_stream<'a>(
208        &'a self,
209        hashes: Vec<B256>,
210    ) -> impl Stream<
211        Item = ProviderResult<
212            Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
213        >,
214    > + 'a {
215        let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
216
217        futures.collect::<FuturesOrdered<_>>()
218    }
219
220    /// Requests the header for the given hash.
221    ///
222    /// Returns an error if the header is not found.
223    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
224        let (response_tx, rx) = oneshot::channel();
225        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
226        rx.await.map_err(|_| CacheServiceUnavailable)?
227    }
228
229    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
230    /// and traversing down through parent hashes. Returns blocks in descending order (newest
231    /// first).
232    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
233    /// cache without making separate database requests.
234    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
235    /// with at least one block.
236    pub async fn get_cached_parent_blocks(
237        &self,
238        block_hash: B256,
239        max_blocks: usize,
240    ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
241        let (response_tx, rx) = oneshot::channel();
242        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
243            block_hash,
244            max_blocks,
245            response_tx,
246        });
247
248        let blocks = rx.await.unwrap_or_default();
249        if blocks.is_empty() {
250            None
251        } else {
252            Some(blocks)
253        }
254    }
255
256    /// Looks up a transaction by its hash in the cache index.
257    ///
258    /// Returns the cached block, transaction index, and optionally receipts if the transaction
259    /// is in a cached block.
260    pub async fn get_transaction_by_hash(
261        &self,
262        tx_hash: TxHash,
263    ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
264        let (response_tx, rx) = oneshot::channel();
265        let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
266        rx.await.ok()?
267    }
268}
269/// Thrown when the cache service task dropped.
270#[derive(Debug, thiserror::Error)]
271#[error("cache service task stopped")]
272pub struct CacheServiceUnavailable;
273
274impl From<CacheServiceUnavailable> for ProviderError {
275    fn from(err: CacheServiceUnavailable) -> Self {
276        Self::other(err)
277    }
278}
279
280/// A task that manages caches for data required by the `eth` rpc implementation.
281///
282/// It provides a caching layer on top of the given
283/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
284/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
285/// into the cache afterwards. While fetching data from disk is sync, this service is async since
286/// requests and data is shared via channels.
287///
288/// This type is an endless future that listens for incoming messages from the user facing
289/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
290/// that does the IO and sends the result back to it. This way the caching service only
291/// handles messages and does LRU lookups and never blocking IO.
292///
293/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
294/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
295/// permits to limit concurrent requests.
296#[must_use = "Type does nothing unless spawned"]
297pub(crate) struct EthStateCacheService<
298    Provider,
299    Tasks,
300    LimitBlocks = ByLength,
301    LimitReceipts = ByLength,
302    LimitHeaders = ByLength,
303> where
304    Provider: BlockReader,
305    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
306    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
307    LimitHeaders: Limiter<B256, Provider::Header>,
308{
309    /// The type used to lookup data from disk
310    provider: Provider,
311    /// The LRU cache for full blocks grouped by their block hash.
312    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
313    /// The LRU cache for block receipts grouped by the block hash.
314    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
315    /// The LRU cache for headers.
316    ///
317    /// Headers are cached because they are required to populate the environment for execution
318    /// (evm).
319    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
320    /// Sender half of the action channel.
321    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
322    /// Receiver half of the action channel.
323    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
324    /// The type that's used to spawn tasks that do the actual work
325    action_task_spawner: Tasks,
326    /// Rate limiter for spawned fetch tasks.
327    ///
328    /// This restricts the max concurrent fetch tasks at the same time.
329    rate_limiter: Arc<Semaphore>,
330    /// LRU index mapping transaction hashes to their block hash and index within the block.
331    tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
332}
333
334impl<Provider> EthStateCacheService<Provider, Runtime>
335where
336    Provider: BlockReader + Clone + Unpin + 'static,
337{
338    /// Indexes all transactions in a block by transaction hash.
339    fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
340        let block_hash = block.hash();
341        for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
342            self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
343        }
344    }
345
346    /// Removes transaction index entries for a reorged block.
347    ///
348    /// Only removes entries that still point to this block, preserving mappings for transactions
349    /// that were re-mined in a new canonical block.
350    fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
351        let block_hash = block.hash();
352        for tx in block.body().transactions() {
353            if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
354                *mapped_hash == block_hash
355            {
356                self.tx_hash_index.remove(tx.tx_hash());
357            }
358        }
359    }
360
361    fn on_new_block(
362        &mut self,
363        block_hash: B256,
364        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
365    ) {
366        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
367            // send the response to queued senders
368            for tx in queued {
369                let _ = tx.send(res.clone());
370            }
371        }
372
373        // cache good block
374        if let Ok(Some(block)) = res {
375            self.full_block_cache.insert(block_hash, block);
376        }
377    }
378
379    fn on_new_receipts(
380        &mut self,
381        block_hash: B256,
382        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
383    ) {
384        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
385            // send the response to queued senders
386            for tx in queued {
387                let _ = tx.send(res.clone());
388            }
389        }
390
391        // cache good receipts
392        if let Ok(Some(receipts)) = res {
393            self.receipts_cache.insert(block_hash, receipts);
394        }
395    }
396
397    fn on_reorg_block(
398        &mut self,
399        block_hash: B256,
400        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
401    ) {
402        let res = res.map(|b| b.map(Arc::new));
403        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
404            // send the response to queued senders
405            for tx in queued {
406                let _ = tx.send(res.clone());
407            }
408        }
409    }
410
411    fn on_reorg_receipts(
412        &mut self,
413        block_hash: B256,
414        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
415    ) {
416        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
417            // send the response to queued senders
418            for tx in queued {
419                let _ = tx.send(res.clone());
420            }
421        }
422    }
423
424    /// Shrinks the queues but leaves some space for the next requests
425    fn shrink_queues(&mut self) {
426        let min_capacity = 2;
427        self.full_block_cache.shrink_to(min_capacity);
428        self.receipts_cache.shrink_to(min_capacity);
429        self.headers_cache.shrink_to(min_capacity);
430    }
431
432    fn update_cached_metrics(&self) {
433        self.full_block_cache.update_cached_metrics();
434        self.receipts_cache.update_cached_metrics();
435        self.headers_cache.update_cached_metrics();
436    }
437}
438
439impl<Provider> Future for EthStateCacheService<Provider, Runtime>
440where
441    Provider: BlockReader + Clone + Unpin + 'static,
442{
443    type Output = ();
444
445    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
446        let this = self.get_mut();
447
448        loop {
449            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
450                // shrink queues if we don't have any work to do
451                this.shrink_queues();
452                return Poll::Pending;
453            };
454
455            match action {
456                None => {
457                    unreachable!("can't close")
458                }
459                Some(action) => {
460                    match action {
461                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
462                            let _ =
463                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
464                        }
465                        CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
466                            let block = this.full_block_cache.get(&block_hash).cloned();
467                            let receipts = this.receipts_cache.get(&block_hash).cloned();
468                            let _ = response_tx.send((block, receipts));
469                        }
470                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
471                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
472                                let _ = response_tx.send(Ok(Some(block)));
473                                continue
474                            }
475
476                            // block is not in the cache, request it if this is the first consumer
477                            if this.full_block_cache.queue(block_hash, response_tx) {
478                                let provider = this.provider.clone();
479                                let action_tx = this.action_tx.clone();
480                                let rate_limiter = this.rate_limiter.clone();
481                                let mut action_sender =
482                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
483                                this.action_task_spawner.spawn_blocking_task(async move {
484                                    // Acquire permit
485                                    let _permit = rate_limiter.acquire().await;
486                                    // Only look in the database to prevent situations where we
487                                    // looking up the tree is blocking
488                                    let block_sender = provider
489                                        .sealed_block_with_senders(
490                                            BlockHashOrNumber::Hash(block_hash),
491                                            TransactionVariant::WithHash,
492                                        )
493                                        .map(|maybe_block| maybe_block.map(Arc::new));
494                                    action_sender.send_block(block_sender);
495                                });
496                            }
497                        }
498                        CacheAction::GetReceipts { block_hash, response_tx } => {
499                            // check if block is cached
500                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
501                                let _ = response_tx.send(Ok(Some(receipts)));
502                                continue
503                            }
504
505                            // block is not in the cache, request it if this is the first consumer
506                            if this.receipts_cache.queue(block_hash, response_tx) {
507                                let provider = this.provider.clone();
508                                let action_tx = this.action_tx.clone();
509                                let rate_limiter = this.rate_limiter.clone();
510                                let mut action_sender =
511                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
512                                this.action_task_spawner.spawn_blocking_task(async move {
513                                    // Acquire permit
514                                    let _permit = rate_limiter.acquire().await;
515                                    let res = provider
516                                        .receipts_by_block(block_hash.into())
517                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
518
519                                    action_sender.send_receipts(res);
520                                });
521                            }
522                        }
523                        CacheAction::GetHeader { block_hash, response_tx } => {
524                            // check if the header is cached
525                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
526                                let _ = response_tx.send(Ok(header));
527                                continue
528                            }
529
530                            // it's possible we have the entire block cached
531                            if let Some(block) = this.full_block_cache.get(&block_hash) {
532                                let _ = response_tx.send(Ok(block.clone_header()));
533                                continue
534                            }
535
536                            // header is not in the cache, request it if this is the first
537                            // consumer
538                            if this.headers_cache.queue(block_hash, response_tx) {
539                                let provider = this.provider.clone();
540                                let action_tx = this.action_tx.clone();
541                                let rate_limiter = this.rate_limiter.clone();
542                                let mut action_sender =
543                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
544                                this.action_task_spawner.spawn_blocking_task(async move {
545                                    // Acquire permit
546                                    let _permit = rate_limiter.acquire().await;
547                                    let header = provider.header(block_hash).and_then(|header| {
548                                        header.ok_or_else(|| {
549                                            ProviderError::HeaderNotFound(block_hash.into())
550                                        })
551                                    });
552                                    action_sender.send_header(header);
553                                });
554                            }
555                        }
556                        CacheAction::ReceiptsResult { block_hash, res } => {
557                            this.on_new_receipts(block_hash, res);
558                        }
559                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
560                            Ok(Some(block_with_senders)) => {
561                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
562                            }
563                            Ok(None) => {
564                                this.on_new_block(block_hash, Ok(None));
565                            }
566                            Err(e) => {
567                                this.on_new_block(block_hash, Err(e));
568                            }
569                        },
570                        CacheAction::HeaderResult { block_hash, res } => {
571                            let res = *res;
572                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
573                                // send the response to queued senders
574                                for tx in queued {
575                                    let _ = tx.send(res.clone());
576                                }
577                            }
578
579                            // cache good header
580                            if let Ok(data) = res {
581                                this.headers_cache.insert(block_hash, data);
582                            }
583                        }
584                        CacheAction::CacheNewCanonicalChain { chain_change } => {
585                            for block in chain_change.blocks {
586                                // Index transactions before caching the block
587                                this.index_block_transactions(&block);
588                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
589                            }
590
591                            for block_receipts in chain_change.receipts {
592                                this.on_new_receipts(
593                                    block_receipts.block_hash,
594                                    Ok(Some(Arc::new(block_receipts.receipts))),
595                                );
596                            }
597                        }
598                        CacheAction::RemoveReorgedChain { chain_change } => {
599                            for block in chain_change.blocks {
600                                // Remove transaction index entries for reorged blocks
601                                this.remove_block_transactions(&block);
602                                this.on_reorg_block(block.hash(), Ok(Some(block)));
603                            }
604
605                            for block_receipts in chain_change.receipts {
606                                this.on_reorg_receipts(
607                                    block_receipts.block_hash,
608                                    Ok(Some(Arc::new(block_receipts.receipts))),
609                                );
610                            }
611                        }
612                        CacheAction::GetCachedParentBlocks {
613                            block_hash,
614                            max_blocks,
615                            response_tx,
616                        } => {
617                            let mut blocks = Vec::new();
618                            let mut current_hash = block_hash;
619
620                            // Start with the requested block
621                            while blocks.len() < max_blocks {
622                                if let Some(block) =
623                                    this.full_block_cache.get(&current_hash).cloned()
624                                {
625                                    // Get the parent hash for the next iteration
626                                    current_hash = block.header().parent_hash();
627                                    blocks.push(block);
628                                } else {
629                                    // Break the loop if we can't find the current block
630                                    break;
631                                }
632                            }
633
634                            let _ = response_tx.send(blocks);
635                        }
636                        CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
637                            let result =
638                                this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
639                                    let block = this.full_block_cache.get(block_hash).cloned()?;
640                                    let receipts = this.receipts_cache.get(block_hash).cloned();
641                                    Some(CachedTransaction::new(block, *idx, receipts))
642                                });
643                            let _ = response_tx.send(result);
644                        }
645                    };
646                    this.update_cached_metrics();
647                }
648            }
649        }
650    }
651}
652
653/// All message variants sent through the channel
654enum CacheAction<B: Block, R> {
655    GetBlockWithSenders {
656        block_hash: B256,
657        response_tx: BlockWithSendersResponseSender<B>,
658    },
659    GetHeader {
660        block_hash: B256,
661        response_tx: HeaderResponseSender<B::Header>,
662    },
663    GetReceipts {
664        block_hash: B256,
665        response_tx: ReceiptsResponseSender<R>,
666    },
667    GetCachedBlock {
668        block_hash: B256,
669        response_tx: CachedBlockResponseSender<B>,
670    },
671    GetCachedBlockAndReceipts {
672        block_hash: B256,
673        response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
674    },
675    BlockWithSendersResult {
676        block_hash: B256,
677        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
678    },
679    ReceiptsResult {
680        block_hash: B256,
681        res: ProviderResult<Option<Arc<Vec<R>>>>,
682    },
683    HeaderResult {
684        block_hash: B256,
685        res: Box<ProviderResult<B::Header>>,
686    },
687    CacheNewCanonicalChain {
688        chain_change: ChainChange<B, R>,
689    },
690    RemoveReorgedChain {
691        chain_change: ChainChange<B, R>,
692    },
693    GetCachedParentBlocks {
694        block_hash: B256,
695        max_blocks: usize,
696        response_tx: CachedParentBlocksResponseSender<B>,
697    },
698    /// Look up a transaction's cached data by its hash
699    GetTransactionByHash {
700        tx_hash: TxHash,
701        response_tx: TransactionHashResponseSender<B, R>,
702    },
703}
704
705struct BlockReceipts<R> {
706    block_hash: B256,
707    receipts: Vec<R>,
708}
709
710/// A change of the canonical chain
711struct ChainChange<B: Block, R> {
712    blocks: Vec<RecoveredBlock<B>>,
713    receipts: Vec<BlockReceipts<R>>,
714}
715
716impl<B: Block, R: Clone> ChainChange<B, R> {
717    fn new<N>(chain: Arc<Chain<N>>) -> Self
718    where
719        N: NodePrimitives<Block = B, Receipt = R>,
720    {
721        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
722            .blocks_and_receipts()
723            .map(|(block, receipts)| {
724                let block_receipts =
725                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
726                (block.clone(), block_receipts)
727            })
728            .unzip();
729        Self { blocks, receipts }
730    }
731}
732
733/// Identifier for the caches.
734#[derive(Copy, Clone, Debug)]
735enum CacheKind {
736    Block,
737    Receipt,
738    Header,
739}
740
741/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
742/// before a result could be sent.
743///
744/// This type wraps a sender and in case the sender is still present on drop emit an error response.
745#[derive(Debug)]
746struct ActionSender<B: Block, R: Send + Sync> {
747    kind: CacheKind,
748    blockhash: B256,
749    tx: Option<UnboundedSender<CacheAction<B, R>>>,
750}
751
752impl<R: Send + Sync, B: Block> ActionSender<B, R> {
753    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
754        Self { kind, blockhash, tx: Some(tx) }
755    }
756
757    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
758        if let Some(tx) = self.tx.take() {
759            let _ = tx.send(CacheAction::BlockWithSendersResult {
760                block_hash: self.blockhash,
761                res: block_sender,
762            });
763        }
764    }
765
766    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
767        if let Some(tx) = self.tx.take() {
768            let _ =
769                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
770        }
771    }
772
773    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
774        if let Some(tx) = self.tx.take() {
775            let _ = tx.send(CacheAction::HeaderResult {
776                block_hash: self.blockhash,
777                res: Box::new(header),
778            });
779        }
780    }
781}
782impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
783    fn drop(&mut self) {
784        if let Some(tx) = self.tx.take() {
785            let msg = match self.kind {
786                CacheKind::Block => CacheAction::BlockWithSendersResult {
787                    block_hash: self.blockhash,
788                    res: Err(CacheServiceUnavailable.into()),
789                },
790                CacheKind::Receipt => CacheAction::ReceiptsResult {
791                    block_hash: self.blockhash,
792                    res: Err(CacheServiceUnavailable.into()),
793                },
794                CacheKind::Header => CacheAction::HeaderResult {
795                    block_hash: self.blockhash,
796                    res: Box::new(Err(CacheServiceUnavailable.into())),
797                },
798            };
799            let _ = tx.send(msg);
800        }
801    }
802}
803
804/// Awaits for new chain events and directly inserts them into the cache so they're available
805/// immediately before they need to be fetched from disk.
806///
807/// Reorged blocks are removed from the cache.
808pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
809    eth_state_cache: EthStateCache<N>,
810    mut events: St,
811) where
812    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
813{
814    while let Some(event) = events.next().await {
815        if let Some(reverted) = event.reverted() {
816            let chain_change = ChainChange::new(reverted);
817
818            let _ =
819                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
820        }
821
822        let chain_change = ChainChange::new(event.committed());
823
824        let _ =
825            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
826    }
827}