reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockHashOrNumber;
6use alloy_primitives::B256;
7use futures::{future::Either, stream::FuturesOrdered, Stream, StreamExt};
8use reth_chain_state::CanonStateNotification;
9use reth_errors::{ProviderError, ProviderResult};
10use reth_execution_types::Chain;
11use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
12use reth_storage_api::{BlockReader, TransactionVariant};
13use reth_tasks::{TaskSpawner, TokioTaskExecutor};
14use schnellru::{ByLength, Limiter};
15use std::{
16    future::Future,
17    pin::Pin,
18    sync::Arc,
19    task::{Context, Poll},
20};
21use tokio::sync::{
22    mpsc::{unbounded_channel, UnboundedSender},
23    oneshot, Semaphore,
24};
25use tokio_stream::wrappers::UnboundedReceiverStream;
26
27pub mod config;
28pub mod db;
29pub mod metrics;
30pub mod multi_consumer;
31
32/// The type that can send the response to a requested [`RecoveredBlock`]
33type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
34
35/// The type that can send the response to a requested [`RecoveredBlock`]
36type BlockWithSendersResponseSender<B> =
37    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
38
39/// The type that can send the response to the requested receipts of a block.
40type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
41
42type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
43
44type CachedBlockAndReceiptsResponseSender<B, R> =
45    oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
46
47/// The type that can send the response to a requested header
48type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
49
50/// The type that can send the response with a chain of cached blocks
51type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
52
53type BlockLruCache<B, L> = MultiConsumerLruCache<
54    B256,
55    Arc<RecoveredBlock<B>>,
56    L,
57    Either<
58        BlockWithSendersResponseSender<B>,
59        BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
60    >,
61>;
62
63type ReceiptsLruCache<R, L> =
64    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
65
66type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
67
68/// Provides async access to cached eth data
69///
70/// This is the frontend for the async caching service which manages cached data on a different
71/// task.
72#[derive(Debug)]
73pub struct EthStateCache<N: NodePrimitives> {
74    to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
75}
76
77impl<N: NodePrimitives> Clone for EthStateCache<N> {
78    fn clone(&self) -> Self {
79        Self { to_service: self.to_service.clone() }
80    }
81}
82
83impl<N: NodePrimitives> EthStateCache<N> {
84    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
85    fn create<Provider, Tasks>(
86        provider: Provider,
87        action_task_spawner: Tasks,
88        max_blocks: u32,
89        max_receipts: u32,
90        max_headers: u32,
91        max_concurrent_db_operations: usize,
92    ) -> (Self, EthStateCacheService<Provider, Tasks>)
93    where
94        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
95    {
96        let (to_service, rx) = unbounded_channel();
97        let service = EthStateCacheService {
98            provider,
99            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
100            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
101            headers_cache: HeaderLruCache::new(max_headers, "headers"),
102            action_tx: to_service.clone(),
103            action_rx: UnboundedReceiverStream::new(rx),
104            action_task_spawner,
105            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
106        };
107        let cache = Self { to_service };
108        (cache, service)
109    }
110
111    /// Creates a new async LRU backed cache service task and spawns it to a new task via
112    /// [`tokio::spawn`].
113    ///
114    /// See also [`Self::spawn_with`]
115    pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
116    where
117        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
118    {
119        Self::spawn_with(provider, config, TokioTaskExecutor::default())
120    }
121
122    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
123    /// spawner.
124    ///
125    /// The cache is memory limited by the given max bytes values.
126    pub fn spawn_with<Provider, Tasks>(
127        provider: Provider,
128        config: EthStateCacheConfig,
129        executor: Tasks,
130    ) -> Self
131    where
132        Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
133        Tasks: TaskSpawner + Clone + 'static,
134    {
135        let EthStateCacheConfig {
136            max_blocks,
137            max_receipts,
138            max_headers,
139            max_concurrent_db_requests,
140        } = config;
141        let (this, service) = Self::create(
142            provider,
143            executor.clone(),
144            max_blocks,
145            max_receipts,
146            max_headers,
147            max_concurrent_db_requests,
148        );
149        executor.spawn_critical("eth state cache", Box::pin(service));
150        this
151    }
152
153    /// Requests the  [`RecoveredBlock`] for the block hash
154    ///
155    /// Returns `None` if the block does not exist.
156    pub async fn get_recovered_block(
157        &self,
158        block_hash: B256,
159    ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
160        let (response_tx, rx) = oneshot::channel();
161        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
162        rx.await.map_err(|_| CacheServiceUnavailable)?
163    }
164
165    /// Requests the receipts for the block hash
166    ///
167    /// Returns `None` if the block was not found.
168    pub async fn get_receipts(
169        &self,
170        block_hash: B256,
171    ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
172        let (response_tx, rx) = oneshot::channel();
173        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
174        rx.await.map_err(|_| CacheServiceUnavailable)?
175    }
176
177    /// Fetches both receipts and block for the given block hash.
178    pub async fn get_block_and_receipts(
179        &self,
180        block_hash: B256,
181    ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
182        let block = self.get_recovered_block(block_hash);
183        let receipts = self.get_receipts(block_hash);
184
185        let (block, receipts) = futures::try_join!(block, receipts)?;
186
187        Ok(block.zip(receipts))
188    }
189
190    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
191    pub async fn get_receipts_and_maybe_block(
192        &self,
193        block_hash: B256,
194    ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
195        let (response_tx, rx) = oneshot::channel();
196        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
197
198        let receipts = self.get_receipts(block_hash);
199
200        let (receipts, block) = futures::join!(receipts, rx);
201
202        let block = block.map_err(|_| CacheServiceUnavailable)?;
203        Ok(receipts?.map(|r| (r, block)))
204    }
205
206    /// Retrieves both block and receipts from cache if available.
207    pub async fn maybe_cached_block_and_receipts(
208        &self,
209        block_hash: B256,
210    ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
211        let (response_tx, rx) = oneshot::channel();
212        let _ = self
213            .to_service
214            .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
215        rx.await.map_err(|_| CacheServiceUnavailable.into())
216    }
217
218    /// Streams cached receipts and blocks for a list of block hashes, preserving input order.
219    #[allow(clippy::type_complexity)]
220    pub fn get_receipts_and_maybe_block_stream<'a>(
221        &'a self,
222        hashes: Vec<B256>,
223    ) -> impl Stream<
224        Item = ProviderResult<
225            Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
226        >,
227    > + 'a {
228        let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
229
230        futures.collect::<FuturesOrdered<_>>()
231    }
232
233    /// Requests the header for the given hash.
234    ///
235    /// Returns an error if the header is not found.
236    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
237        let (response_tx, rx) = oneshot::channel();
238        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
239        rx.await.map_err(|_| CacheServiceUnavailable)?
240    }
241
242    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
243    /// and traversing down through parent hashes. Returns blocks in descending order (newest
244    /// first).
245    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
246    /// cache without making separate database requests.
247    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
248    /// with at least one block.
249    pub async fn get_cached_parent_blocks(
250        &self,
251        block_hash: B256,
252        max_blocks: usize,
253    ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
254        let (response_tx, rx) = oneshot::channel();
255        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
256            block_hash,
257            max_blocks,
258            response_tx,
259        });
260
261        let blocks = rx.await.unwrap_or_default();
262        if blocks.is_empty() {
263            None
264        } else {
265            Some(blocks)
266        }
267    }
268}
269/// Thrown when the cache service task dropped.
270#[derive(Debug, thiserror::Error)]
271#[error("cache service task stopped")]
272pub struct CacheServiceUnavailable;
273
274impl From<CacheServiceUnavailable> for ProviderError {
275    fn from(err: CacheServiceUnavailable) -> Self {
276        Self::other(err)
277    }
278}
279
280/// A task that manages caches for data required by the `eth` rpc implementation.
281///
282/// It provides a caching layer on top of the given
283/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
284/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
285/// into the cache afterwards. While fetching data from disk is sync, this service is async since
286/// requests and data is shared via channels.
287///
288/// This type is an endless future that listens for incoming messages from the user facing
289/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
290/// that does the IO and sends the result back to it. This way the caching service only
291/// handles messages and does LRU lookups and never blocking IO.
292///
293/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
294/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
295/// permits to limit concurrent requests.
296#[must_use = "Type does nothing unless spawned"]
297pub(crate) struct EthStateCacheService<
298    Provider,
299    Tasks,
300    LimitBlocks = ByLength,
301    LimitReceipts = ByLength,
302    LimitHeaders = ByLength,
303> where
304    Provider: BlockReader,
305    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
306    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
307    LimitHeaders: Limiter<B256, Provider::Header>,
308{
309    /// The type used to lookup data from disk
310    provider: Provider,
311    /// The LRU cache for full blocks grouped by their block hash.
312    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
313    /// The LRU cache for block receipts grouped by the block hash.
314    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
315    /// The LRU cache for headers.
316    ///
317    /// Headers are cached because they are required to populate the environment for execution
318    /// (evm).
319    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
320    /// Sender half of the action channel.
321    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
322    /// Receiver half of the action channel.
323    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
324    /// The type that's used to spawn tasks that do the actual work
325    action_task_spawner: Tasks,
326    /// Rate limiter for spawned fetch tasks.
327    ///
328    /// This restricts the max concurrent fetch tasks at the same time.
329    rate_limiter: Arc<Semaphore>,
330}
331
332impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
333where
334    Provider: BlockReader + Clone + Unpin + 'static,
335    Tasks: TaskSpawner + Clone + 'static,
336{
337    fn on_new_block(
338        &mut self,
339        block_hash: B256,
340        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
341    ) {
342        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
343            // send the response to queued senders
344            for tx in queued {
345                match tx {
346                    Either::Left(block_with_senders) => {
347                        let _ = block_with_senders.send(res.clone());
348                    }
349                    Either::Right(transaction_tx) => {
350                        let _ = transaction_tx.send(res.clone().map(|maybe_block| {
351                            maybe_block.map(|block| block.body().transactions().to_vec())
352                        }));
353                    }
354                }
355            }
356        }
357
358        // cache good block
359        if let Ok(Some(block)) = res {
360            self.full_block_cache.insert(block_hash, block);
361        }
362    }
363
364    fn on_new_receipts(
365        &mut self,
366        block_hash: B256,
367        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
368    ) {
369        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
370            // send the response to queued senders
371            for tx in queued {
372                let _ = tx.send(res.clone());
373            }
374        }
375
376        // cache good receipts
377        if let Ok(Some(receipts)) = res {
378            self.receipts_cache.insert(block_hash, receipts);
379        }
380    }
381
382    fn on_reorg_block(
383        &mut self,
384        block_hash: B256,
385        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
386    ) {
387        let res = res.map(|b| b.map(Arc::new));
388        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
389            // send the response to queued senders
390            for tx in queued {
391                match tx {
392                    Either::Left(block_with_senders) => {
393                        let _ = block_with_senders.send(res.clone());
394                    }
395                    Either::Right(transaction_tx) => {
396                        let _ = transaction_tx.send(res.clone().map(|maybe_block| {
397                            maybe_block.map(|block| block.body().transactions().to_vec())
398                        }));
399                    }
400                }
401            }
402        }
403    }
404
405    fn on_reorg_receipts(
406        &mut self,
407        block_hash: B256,
408        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
409    ) {
410        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
411            // send the response to queued senders
412            for tx in queued {
413                let _ = tx.send(res.clone());
414            }
415        }
416    }
417
418    /// Shrinks the queues but leaves some space for the next requests
419    fn shrink_queues(&mut self) {
420        let min_capacity = 2;
421        self.full_block_cache.shrink_to(min_capacity);
422        self.receipts_cache.shrink_to(min_capacity);
423        self.headers_cache.shrink_to(min_capacity);
424    }
425
426    fn update_cached_metrics(&self) {
427        self.full_block_cache.update_cached_metrics();
428        self.receipts_cache.update_cached_metrics();
429        self.headers_cache.update_cached_metrics();
430    }
431}
432
433impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
434where
435    Provider: BlockReader + Clone + Unpin + 'static,
436    Tasks: TaskSpawner + Clone + 'static,
437{
438    type Output = ();
439
440    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441        let this = self.get_mut();
442
443        loop {
444            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
445                // shrink queues if we don't have any work to do
446                this.shrink_queues();
447                return Poll::Pending;
448            };
449
450            match action {
451                None => {
452                    unreachable!("can't close")
453                }
454                Some(action) => {
455                    match action {
456                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
457                            let _ =
458                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
459                        }
460                        CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
461                            let block = this.full_block_cache.get(&block_hash).cloned();
462                            let receipts = this.receipts_cache.get(&block_hash).cloned();
463                            let _ = response_tx.send((block, receipts));
464                        }
465                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
466                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
467                                let _ = response_tx.send(Ok(Some(block)));
468                                continue
469                            }
470
471                            // block is not in the cache, request it if this is the first consumer
472                            if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
473                                let provider = this.provider.clone();
474                                let action_tx = this.action_tx.clone();
475                                let rate_limiter = this.rate_limiter.clone();
476                                let mut action_sender =
477                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
478                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
479                                    // Acquire permit
480                                    let _permit = rate_limiter.acquire().await;
481                                    // Only look in the database to prevent situations where we
482                                    // looking up the tree is blocking
483                                    let block_sender = provider
484                                        .sealed_block_with_senders(
485                                            BlockHashOrNumber::Hash(block_hash),
486                                            TransactionVariant::WithHash,
487                                        )
488                                        .map(|maybe_block| maybe_block.map(Arc::new));
489                                    action_sender.send_block(block_sender);
490                                }));
491                            }
492                        }
493                        CacheAction::GetReceipts { block_hash, response_tx } => {
494                            // check if block is cached
495                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
496                                let _ = response_tx.send(Ok(Some(receipts)));
497                                continue
498                            }
499
500                            // block is not in the cache, request it if this is the first consumer
501                            if this.receipts_cache.queue(block_hash, response_tx) {
502                                let provider = this.provider.clone();
503                                let action_tx = this.action_tx.clone();
504                                let rate_limiter = this.rate_limiter.clone();
505                                let mut action_sender =
506                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
507                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
508                                    // Acquire permit
509                                    let _permit = rate_limiter.acquire().await;
510                                    let res = provider
511                                        .receipts_by_block(block_hash.into())
512                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
513
514                                    action_sender.send_receipts(res);
515                                }));
516                            }
517                        }
518                        CacheAction::GetHeader { block_hash, response_tx } => {
519                            // check if the header is cached
520                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
521                                let _ = response_tx.send(Ok(header));
522                                continue
523                            }
524
525                            // it's possible we have the entire block cached
526                            if let Some(block) = this.full_block_cache.get(&block_hash) {
527                                let _ = response_tx.send(Ok(block.clone_header()));
528                                continue
529                            }
530
531                            // header is not in the cache, request it if this is the first
532                            // consumer
533                            if this.headers_cache.queue(block_hash, response_tx) {
534                                let provider = this.provider.clone();
535                                let action_tx = this.action_tx.clone();
536                                let rate_limiter = this.rate_limiter.clone();
537                                let mut action_sender =
538                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
539                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
540                                    // Acquire permit
541                                    let _permit = rate_limiter.acquire().await;
542                                    let header = provider.header(&block_hash).and_then(|header| {
543                                        header.ok_or_else(|| {
544                                            ProviderError::HeaderNotFound(block_hash.into())
545                                        })
546                                    });
547                                    action_sender.send_header(header);
548                                }));
549                            }
550                        }
551                        CacheAction::ReceiptsResult { block_hash, res } => {
552                            this.on_new_receipts(block_hash, res);
553                        }
554                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
555                            Ok(Some(block_with_senders)) => {
556                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
557                            }
558                            Ok(None) => {
559                                this.on_new_block(block_hash, Ok(None));
560                            }
561                            Err(e) => {
562                                this.on_new_block(block_hash, Err(e));
563                            }
564                        },
565                        CacheAction::HeaderResult { block_hash, res } => {
566                            let res = *res;
567                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
568                                // send the response to queued senders
569                                for tx in queued {
570                                    let _ = tx.send(res.clone());
571                                }
572                            }
573
574                            // cache good header
575                            if let Ok(data) = res {
576                                this.headers_cache.insert(block_hash, data);
577                            }
578                        }
579                        CacheAction::CacheNewCanonicalChain { chain_change } => {
580                            for block in chain_change.blocks {
581                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
582                            }
583
584                            for block_receipts in chain_change.receipts {
585                                this.on_new_receipts(
586                                    block_receipts.block_hash,
587                                    Ok(Some(Arc::new(block_receipts.receipts))),
588                                );
589                            }
590                        }
591                        CacheAction::RemoveReorgedChain { chain_change } => {
592                            for block in chain_change.blocks {
593                                this.on_reorg_block(block.hash(), Ok(Some(block)));
594                            }
595
596                            for block_receipts in chain_change.receipts {
597                                this.on_reorg_receipts(
598                                    block_receipts.block_hash,
599                                    Ok(Some(Arc::new(block_receipts.receipts))),
600                                );
601                            }
602                        }
603                        CacheAction::GetCachedParentBlocks {
604                            block_hash,
605                            max_blocks,
606                            response_tx,
607                        } => {
608                            let mut blocks = Vec::new();
609                            let mut current_hash = block_hash;
610
611                            // Start with the requested block
612                            while blocks.len() < max_blocks {
613                                if let Some(block) =
614                                    this.full_block_cache.get(&current_hash).cloned()
615                                {
616                                    // Get the parent hash for the next iteration
617                                    current_hash = block.header().parent_hash();
618                                    blocks.push(block);
619                                } else {
620                                    // Break the loop if we can't find the current block
621                                    break;
622                                }
623                            }
624
625                            let _ = response_tx.send(blocks);
626                        }
627                    };
628                    this.update_cached_metrics();
629                }
630            }
631        }
632    }
633}
634
635/// All message variants sent through the channel
636enum CacheAction<B: Block, R> {
637    GetBlockWithSenders {
638        block_hash: B256,
639        response_tx: BlockWithSendersResponseSender<B>,
640    },
641    GetHeader {
642        block_hash: B256,
643        response_tx: HeaderResponseSender<B::Header>,
644    },
645    GetReceipts {
646        block_hash: B256,
647        response_tx: ReceiptsResponseSender<R>,
648    },
649    GetCachedBlock {
650        block_hash: B256,
651        response_tx: CachedBlockResponseSender<B>,
652    },
653    GetCachedBlockAndReceipts {
654        block_hash: B256,
655        response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
656    },
657    BlockWithSendersResult {
658        block_hash: B256,
659        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
660    },
661    ReceiptsResult {
662        block_hash: B256,
663        res: ProviderResult<Option<Arc<Vec<R>>>>,
664    },
665    HeaderResult {
666        block_hash: B256,
667        res: Box<ProviderResult<B::Header>>,
668    },
669    CacheNewCanonicalChain {
670        chain_change: ChainChange<B, R>,
671    },
672    RemoveReorgedChain {
673        chain_change: ChainChange<B, R>,
674    },
675    GetCachedParentBlocks {
676        block_hash: B256,
677        max_blocks: usize,
678        response_tx: CachedParentBlocksResponseSender<B>,
679    },
680}
681
682struct BlockReceipts<R> {
683    block_hash: B256,
684    receipts: Vec<R>,
685}
686
687/// A change of the canonical chain
688struct ChainChange<B: Block, R> {
689    blocks: Vec<RecoveredBlock<B>>,
690    receipts: Vec<BlockReceipts<R>>,
691}
692
693impl<B: Block, R: Clone> ChainChange<B, R> {
694    fn new<N>(chain: Arc<Chain<N>>) -> Self
695    where
696        N: NodePrimitives<Block = B, Receipt = R>,
697    {
698        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
699            .blocks_and_receipts()
700            .map(|(block, receipts)| {
701                let block_receipts =
702                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
703                (block.clone(), block_receipts)
704            })
705            .unzip();
706        Self { blocks, receipts }
707    }
708}
709
710/// Identifier for the caches.
711#[derive(Copy, Clone, Debug)]
712enum CacheKind {
713    Block,
714    Receipt,
715    Header,
716}
717
718/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
719/// before a result could be sent.
720///
721/// This type wraps a sender and in case the sender is still present on drop emit an error response.
722#[derive(Debug)]
723struct ActionSender<B: Block, R: Send + Sync> {
724    kind: CacheKind,
725    blockhash: B256,
726    tx: Option<UnboundedSender<CacheAction<B, R>>>,
727}
728
729impl<R: Send + Sync, B: Block> ActionSender<B, R> {
730    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
731        Self { kind, blockhash, tx: Some(tx) }
732    }
733
734    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
735        if let Some(tx) = self.tx.take() {
736            let _ = tx.send(CacheAction::BlockWithSendersResult {
737                block_hash: self.blockhash,
738                res: block_sender,
739            });
740        }
741    }
742
743    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
744        if let Some(tx) = self.tx.take() {
745            let _ =
746                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
747        }
748    }
749
750    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
751        if let Some(tx) = self.tx.take() {
752            let _ = tx.send(CacheAction::HeaderResult {
753                block_hash: self.blockhash,
754                res: Box::new(header),
755            });
756        }
757    }
758}
759impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
760    fn drop(&mut self) {
761        if let Some(tx) = self.tx.take() {
762            let msg = match self.kind {
763                CacheKind::Block => CacheAction::BlockWithSendersResult {
764                    block_hash: self.blockhash,
765                    res: Err(CacheServiceUnavailable.into()),
766                },
767                CacheKind::Receipt => CacheAction::ReceiptsResult {
768                    block_hash: self.blockhash,
769                    res: Err(CacheServiceUnavailable.into()),
770                },
771                CacheKind::Header => CacheAction::HeaderResult {
772                    block_hash: self.blockhash,
773                    res: Box::new(Err(CacheServiceUnavailable.into())),
774                },
775            };
776            let _ = tx.send(msg);
777        }
778    }
779}
780
781/// Awaits for new chain events and directly inserts them into the cache so they're available
782/// immediately before they need to be fetched from disk.
783///
784/// Reorged blocks are removed from the cache.
785pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
786    eth_state_cache: EthStateCache<N>,
787    mut events: St,
788) where
789    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
790{
791    while let Some(event) = events.next().await {
792        if let Some(reverted) = event.reverted() {
793            let chain_change = ChainChange::new(reverted);
794
795            let _ =
796                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
797        }
798
799        let chain_change = ChainChange::new(event.committed());
800
801        let _ =
802            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
803    }
804}