reth_rpc_eth_types/cache/
mod.rs

1//! Async caching support for eth RPC
2
3use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockHashOrNumber;
6use alloy_primitives::B256;
7use futures::{future::Either, Stream, StreamExt};
8use reth_chain_state::CanonStateNotification;
9use reth_errors::{ProviderError, ProviderResult};
10use reth_execution_types::Chain;
11use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
12use reth_storage_api::{BlockReader, TransactionVariant};
13use reth_tasks::{TaskSpawner, TokioTaskExecutor};
14use schnellru::{ByLength, Limiter};
15use std::{
16    future::Future,
17    pin::Pin,
18    sync::Arc,
19    task::{Context, Poll},
20};
21use tokio::sync::{
22    mpsc::{unbounded_channel, UnboundedSender},
23    oneshot, Semaphore,
24};
25use tokio_stream::wrappers::UnboundedReceiverStream;
26
27pub mod config;
28pub mod db;
29pub mod metrics;
30pub mod multi_consumer;
31
32/// The type that can send the response to a requested [`RecoveredBlock`]
33type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
34
35/// The type that can send the response to a requested [`RecoveredBlock`]
36type BlockWithSendersResponseSender<B> =
37    oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
38
39/// The type that can send the response to the requested receipts of a block.
40type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
41
42type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
43
44/// The type that can send the response to a requested header
45type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
46
47/// The type that can send the response with a chain of cached blocks
48type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
49
50type BlockLruCache<B, L> = MultiConsumerLruCache<
51    B256,
52    Arc<RecoveredBlock<B>>,
53    L,
54    Either<
55        BlockWithSendersResponseSender<B>,
56        BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
57    >,
58>;
59
60type ReceiptsLruCache<R, L> =
61    MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
62
63type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
64
65/// Provides async access to cached eth data
66///
67/// This is the frontend for the async caching service which manages cached data on a different
68/// task.
69#[derive(Debug)]
70pub struct EthStateCache<B: Block, R> {
71    to_service: UnboundedSender<CacheAction<B, R>>,
72}
73
74impl<B: Block, R> Clone for EthStateCache<B, R> {
75    fn clone(&self) -> Self {
76        Self { to_service: self.to_service.clone() }
77    }
78}
79
80impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
81    /// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
82    fn create<Provider, Tasks>(
83        provider: Provider,
84        action_task_spawner: Tasks,
85        max_blocks: u32,
86        max_receipts: u32,
87        max_headers: u32,
88        max_concurrent_db_operations: usize,
89    ) -> (Self, EthStateCacheService<Provider, Tasks>)
90    where
91        Provider: BlockReader<Block = B, Receipt = R>,
92    {
93        let (to_service, rx) = unbounded_channel();
94        let service = EthStateCacheService {
95            provider,
96            full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
97            receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
98            headers_cache: HeaderLruCache::new(max_headers, "headers"),
99            action_tx: to_service.clone(),
100            action_rx: UnboundedReceiverStream::new(rx),
101            action_task_spawner,
102            rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
103        };
104        let cache = Self { to_service };
105        (cache, service)
106    }
107
108    /// Creates a new async LRU backed cache service task and spawns it to a new task via
109    /// [`tokio::spawn`].
110    ///
111    /// See also [`Self::spawn_with`]
112    pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
113    where
114        Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
115    {
116        Self::spawn_with(provider, config, TokioTaskExecutor::default())
117    }
118
119    /// Creates a new async LRU backed cache service task and spawns it to a new task via the given
120    /// spawner.
121    ///
122    /// The cache is memory limited by the given max bytes values.
123    pub fn spawn_with<Provider, Tasks>(
124        provider: Provider,
125        config: EthStateCacheConfig,
126        executor: Tasks,
127    ) -> Self
128    where
129        Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
130        Tasks: TaskSpawner + Clone + 'static,
131    {
132        let EthStateCacheConfig {
133            max_blocks,
134            max_receipts,
135            max_headers,
136            max_concurrent_db_requests,
137        } = config;
138        let (this, service) = Self::create(
139            provider,
140            executor.clone(),
141            max_blocks,
142            max_receipts,
143            max_headers,
144            max_concurrent_db_requests,
145        );
146        executor.spawn_critical("eth state cache", Box::pin(service));
147        this
148    }
149
150    /// Requests the  [`RecoveredBlock`] for the block hash
151    ///
152    /// Returns `None` if the block does not exist.
153    pub async fn get_recovered_block(
154        &self,
155        block_hash: B256,
156    ) -> ProviderResult<Option<Arc<RecoveredBlock<B>>>> {
157        let (response_tx, rx) = oneshot::channel();
158        let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
159        rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
160    }
161
162    /// Requests the receipts for the block hash
163    ///
164    /// Returns `None` if the block was not found.
165    pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult<Option<Arc<Vec<R>>>> {
166        let (response_tx, rx) = oneshot::channel();
167        let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
168        rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
169    }
170
171    /// Fetches both receipts and block for the given block hash.
172    pub async fn get_block_and_receipts(
173        &self,
174        block_hash: B256,
175    ) -> ProviderResult<Option<(Arc<RecoveredBlock<B>>, Arc<Vec<R>>)>> {
176        let block = self.get_recovered_block(block_hash);
177        let receipts = self.get_receipts(block_hash);
178
179        let (block, receipts) = futures::try_join!(block, receipts)?;
180
181        Ok(block.zip(receipts))
182    }
183
184    /// Retrieves receipts and blocks from cache if block is in the cache, otherwise only receipts.
185    pub async fn get_receipts_and_maybe_block(
186        &self,
187        block_hash: B256,
188    ) -> ProviderResult<Option<(Arc<Vec<R>>, Option<Arc<RecoveredBlock<B>>>)>> {
189        let (response_tx, rx) = oneshot::channel();
190        let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
191
192        let receipts = self.get_receipts(block_hash);
193
194        let (receipts, block) = futures::join!(receipts, rx);
195
196        let block = block.map_err(|_| ProviderError::CacheServiceUnavailable)?;
197        Ok(receipts?.map(|r| (r, block)))
198    }
199
200    /// Requests the header for the given hash.
201    ///
202    /// Returns an error if the header is not found.
203    pub async fn get_header(&self, block_hash: B256) -> ProviderResult<B::Header> {
204        let (response_tx, rx) = oneshot::channel();
205        let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
206        rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
207    }
208
209    /// Retrieves a chain of connected blocks from the cache, starting from the given block hash
210    /// and traversing down through parent hashes. Returns blocks in descending order (newest
211    /// first).
212    /// This is useful for efficiently retrieving a sequence of blocks that might already be in
213    /// cache without making separate database requests.
214    /// Returns `None` if no blocks are found in the cache, otherwise returns `Some(Vec<...>)`
215    /// with at least one block.
216    pub async fn get_cached_parent_blocks(
217        &self,
218        block_hash: B256,
219        max_blocks: usize,
220    ) -> Option<Vec<Arc<RecoveredBlock<B>>>> {
221        let (response_tx, rx) = oneshot::channel();
222        let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
223            block_hash,
224            max_blocks,
225            response_tx,
226        });
227
228        let blocks = rx.await.unwrap_or_default();
229        if blocks.is_empty() {
230            None
231        } else {
232            Some(blocks)
233        }
234    }
235}
236
237/// A task that manages caches for data required by the `eth` rpc implementation.
238///
239/// It provides a caching layer on top of the given
240/// [`StateProvider`](reth_storage_api::StateProvider) and keeps data fetched via the provider in
241/// memory in an LRU cache. If the requested data is missing in the cache it is fetched and inserted
242/// into the cache afterwards. While fetching data from disk is sync, this service is async since
243/// requests and data is shared via channels.
244///
245/// This type is an endless future that listens for incoming messages from the user facing
246/// [`EthStateCache`] via a channel. If the requested data is not cached then it spawns a new task
247/// that does the IO and sends the result back to it. This way the caching service only
248/// handles messages and does LRU lookups and never blocking IO.
249///
250/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
251/// `reth_rpc::EthApi` which is typically invoked by the RPC server, which already uses
252/// permits to limit concurrent requests.
253#[must_use = "Type does nothing unless spawned"]
254pub(crate) struct EthStateCacheService<
255    Provider,
256    Tasks,
257    LimitBlocks = ByLength,
258    LimitReceipts = ByLength,
259    LimitHeaders = ByLength,
260> where
261    Provider: BlockReader,
262    LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
263    LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
264    LimitHeaders: Limiter<B256, Provider::Header>,
265{
266    /// The type used to lookup data from disk
267    provider: Provider,
268    /// The LRU cache for full blocks grouped by their block hash.
269    full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
270    /// The LRU cache for block receipts grouped by the block hash.
271    receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
272    /// The LRU cache for headers.
273    ///
274    /// Headers are cached because they are required to populate the environment for execution
275    /// (evm).
276    headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
277    /// Sender half of the action channel.
278    action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
279    /// Receiver half of the action channel.
280    action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
281    /// The type that's used to spawn tasks that do the actual work
282    action_task_spawner: Tasks,
283    /// Rate limiter for spawned fetch tasks.
284    ///
285    /// This restricts the max concurrent fetch tasks at the same time.
286    rate_limiter: Arc<Semaphore>,
287}
288
289impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
290where
291    Provider: BlockReader + Clone + Unpin + 'static,
292    Tasks: TaskSpawner + Clone + 'static,
293{
294    fn on_new_block(
295        &mut self,
296        block_hash: B256,
297        res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
298    ) {
299        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
300            // send the response to queued senders
301            for tx in queued {
302                match tx {
303                    Either::Left(block_with_senders) => {
304                        let _ = block_with_senders.send(res.clone());
305                    }
306                    Either::Right(transaction_tx) => {
307                        let _ = transaction_tx.send(res.clone().map(|maybe_block| {
308                            maybe_block.map(|block| block.body().transactions().to_vec())
309                        }));
310                    }
311                }
312            }
313        }
314
315        // cache good block
316        if let Ok(Some(block)) = res {
317            self.full_block_cache.insert(block_hash, block);
318        }
319    }
320
321    fn on_new_receipts(
322        &mut self,
323        block_hash: B256,
324        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
325    ) {
326        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
327            // send the response to queued senders
328            for tx in queued {
329                let _ = tx.send(res.clone());
330            }
331        }
332
333        // cache good receipts
334        if let Ok(Some(receipts)) = res {
335            self.receipts_cache.insert(block_hash, receipts);
336        }
337    }
338
339    fn on_reorg_block(
340        &mut self,
341        block_hash: B256,
342        res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
343    ) {
344        let res = res.map(|b| b.map(Arc::new));
345        if let Some(queued) = self.full_block_cache.remove(&block_hash) {
346            // send the response to queued senders
347            for tx in queued {
348                match tx {
349                    Either::Left(block_with_senders) => {
350                        let _ = block_with_senders.send(res.clone());
351                    }
352                    Either::Right(transaction_tx) => {
353                        let _ = transaction_tx.send(res.clone().map(|maybe_block| {
354                            maybe_block.map(|block| block.body().transactions().to_vec())
355                        }));
356                    }
357                }
358            }
359        }
360    }
361
362    fn on_reorg_receipts(
363        &mut self,
364        block_hash: B256,
365        res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
366    ) {
367        if let Some(queued) = self.receipts_cache.remove(&block_hash) {
368            // send the response to queued senders
369            for tx in queued {
370                let _ = tx.send(res.clone());
371            }
372        }
373    }
374
375    /// Shrinks the queues but leaves some space for the next requests
376    fn shrink_queues(&mut self) {
377        let min_capacity = 2;
378        self.full_block_cache.shrink_to(min_capacity);
379        self.receipts_cache.shrink_to(min_capacity);
380        self.headers_cache.shrink_to(min_capacity);
381    }
382
383    fn update_cached_metrics(&self) {
384        self.full_block_cache.update_cached_metrics();
385        self.receipts_cache.update_cached_metrics();
386        self.headers_cache.update_cached_metrics();
387    }
388}
389
390impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
391where
392    Provider: BlockReader + Clone + Unpin + 'static,
393    Tasks: TaskSpawner + Clone + 'static,
394{
395    type Output = ();
396
397    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398        let this = self.get_mut();
399
400        loop {
401            let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
402                // shrink queues if we don't have any work to do
403                this.shrink_queues();
404                return Poll::Pending;
405            };
406
407            match action {
408                None => {
409                    unreachable!("can't close")
410                }
411                Some(action) => {
412                    match action {
413                        CacheAction::GetCachedBlock { block_hash, response_tx } => {
414                            let _ =
415                                response_tx.send(this.full_block_cache.get(&block_hash).cloned());
416                        }
417                        CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
418                            if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
419                                let _ = response_tx.send(Ok(Some(block)));
420                                continue
421                            }
422
423                            // block is not in the cache, request it if this is the first consumer
424                            if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
425                                let provider = this.provider.clone();
426                                let action_tx = this.action_tx.clone();
427                                let rate_limiter = this.rate_limiter.clone();
428                                let mut action_sender =
429                                    ActionSender::new(CacheKind::Block, block_hash, action_tx);
430                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
431                                    // Acquire permit
432                                    let _permit = rate_limiter.acquire().await;
433                                    // Only look in the database to prevent situations where we
434                                    // looking up the tree is blocking
435                                    let block_sender = provider
436                                        .sealed_block_with_senders(
437                                            BlockHashOrNumber::Hash(block_hash),
438                                            TransactionVariant::WithHash,
439                                        )
440                                        .map(|maybe_block| maybe_block.map(Arc::new));
441                                    action_sender.send_block(block_sender);
442                                }));
443                            }
444                        }
445                        CacheAction::GetReceipts { block_hash, response_tx } => {
446                            // check if block is cached
447                            if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
448                                let _ = response_tx.send(Ok(Some(receipts)));
449                                continue
450                            }
451
452                            // block is not in the cache, request it if this is the first consumer
453                            if this.receipts_cache.queue(block_hash, response_tx) {
454                                let provider = this.provider.clone();
455                                let action_tx = this.action_tx.clone();
456                                let rate_limiter = this.rate_limiter.clone();
457                                let mut action_sender =
458                                    ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
459                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
460                                    // Acquire permit
461                                    let _permit = rate_limiter.acquire().await;
462                                    let res = provider
463                                        .receipts_by_block(block_hash.into())
464                                        .map(|maybe_receipts| maybe_receipts.map(Arc::new));
465
466                                    action_sender.send_receipts(res);
467                                }));
468                            }
469                        }
470                        CacheAction::GetHeader { block_hash, response_tx } => {
471                            // check if the header is cached
472                            if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
473                                let _ = response_tx.send(Ok(header));
474                                continue
475                            }
476
477                            // it's possible we have the entire block cached
478                            if let Some(block) = this.full_block_cache.get(&block_hash) {
479                                let _ = response_tx.send(Ok(block.clone_header()));
480                                continue
481                            }
482
483                            // header is not in the cache, request it if this is the first
484                            // consumer
485                            if this.headers_cache.queue(block_hash, response_tx) {
486                                let provider = this.provider.clone();
487                                let action_tx = this.action_tx.clone();
488                                let rate_limiter = this.rate_limiter.clone();
489                                let mut action_sender =
490                                    ActionSender::new(CacheKind::Header, block_hash, action_tx);
491                                this.action_task_spawner.spawn_blocking(Box::pin(async move {
492                                    // Acquire permit
493                                    let _permit = rate_limiter.acquire().await;
494                                    let header = provider.header(&block_hash).and_then(|header| {
495                                        header.ok_or_else(|| {
496                                            ProviderError::HeaderNotFound(block_hash.into())
497                                        })
498                                    });
499                                    action_sender.send_header(header);
500                                }));
501                            }
502                        }
503                        CacheAction::ReceiptsResult { block_hash, res } => {
504                            this.on_new_receipts(block_hash, res);
505                        }
506                        CacheAction::BlockWithSendersResult { block_hash, res } => match res {
507                            Ok(Some(block_with_senders)) => {
508                                this.on_new_block(block_hash, Ok(Some(block_with_senders)));
509                            }
510                            Ok(None) => {
511                                this.on_new_block(block_hash, Ok(None));
512                            }
513                            Err(e) => {
514                                this.on_new_block(block_hash, Err(e));
515                            }
516                        },
517                        CacheAction::HeaderResult { block_hash, res } => {
518                            let res = *res;
519                            if let Some(queued) = this.headers_cache.remove(&block_hash) {
520                                // send the response to queued senders
521                                for tx in queued {
522                                    let _ = tx.send(res.clone());
523                                }
524                            }
525
526                            // cache good header
527                            if let Ok(data) = res {
528                                this.headers_cache.insert(block_hash, data);
529                            }
530                        }
531                        CacheAction::CacheNewCanonicalChain { chain_change } => {
532                            for block in chain_change.blocks {
533                                this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
534                            }
535
536                            for block_receipts in chain_change.receipts {
537                                this.on_new_receipts(
538                                    block_receipts.block_hash,
539                                    Ok(Some(Arc::new(block_receipts.receipts))),
540                                );
541                            }
542                        }
543                        CacheAction::RemoveReorgedChain { chain_change } => {
544                            for block in chain_change.blocks {
545                                this.on_reorg_block(block.hash(), Ok(Some(block)));
546                            }
547
548                            for block_receipts in chain_change.receipts {
549                                this.on_reorg_receipts(
550                                    block_receipts.block_hash,
551                                    Ok(Some(Arc::new(block_receipts.receipts))),
552                                );
553                            }
554                        }
555                        CacheAction::GetCachedParentBlocks {
556                            block_hash,
557                            max_blocks,
558                            response_tx,
559                        } => {
560                            let mut blocks = Vec::new();
561                            let mut current_hash = block_hash;
562
563                            // Start with the requested block
564                            while blocks.len() < max_blocks {
565                                if let Some(block) =
566                                    this.full_block_cache.get(&current_hash).cloned()
567                                {
568                                    // Get the parent hash for the next iteration
569                                    current_hash = block.header().parent_hash();
570                                    blocks.push(block);
571                                } else {
572                                    // Break the loop if we can't find the current block
573                                    break;
574                                }
575                            }
576
577                            let _ = response_tx.send(blocks);
578                        }
579                    };
580                    this.update_cached_metrics();
581                }
582            }
583        }
584    }
585}
586
587/// All message variants sent through the channel
588enum CacheAction<B: Block, R> {
589    GetBlockWithSenders {
590        block_hash: B256,
591        response_tx: BlockWithSendersResponseSender<B>,
592    },
593    GetHeader {
594        block_hash: B256,
595        response_tx: HeaderResponseSender<B::Header>,
596    },
597    GetReceipts {
598        block_hash: B256,
599        response_tx: ReceiptsResponseSender<R>,
600    },
601    GetCachedBlock {
602        block_hash: B256,
603        response_tx: CachedBlockResponseSender<B>,
604    },
605    BlockWithSendersResult {
606        block_hash: B256,
607        res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
608    },
609    ReceiptsResult {
610        block_hash: B256,
611        res: ProviderResult<Option<Arc<Vec<R>>>>,
612    },
613    HeaderResult {
614        block_hash: B256,
615        res: Box<ProviderResult<B::Header>>,
616    },
617    CacheNewCanonicalChain {
618        chain_change: ChainChange<B, R>,
619    },
620    RemoveReorgedChain {
621        chain_change: ChainChange<B, R>,
622    },
623    GetCachedParentBlocks {
624        block_hash: B256,
625        max_blocks: usize,
626        response_tx: CachedParentBlocksResponseSender<B>,
627    },
628}
629
630struct BlockReceipts<R> {
631    block_hash: B256,
632    receipts: Vec<R>,
633}
634
635/// A change of the canonical chain
636struct ChainChange<B: Block, R> {
637    blocks: Vec<RecoveredBlock<B>>,
638    receipts: Vec<BlockReceipts<R>>,
639}
640
641impl<B: Block, R: Clone> ChainChange<B, R> {
642    fn new<N>(chain: Arc<Chain<N>>) -> Self
643    where
644        N: NodePrimitives<Block = B, Receipt = R>,
645    {
646        let (blocks, receipts): (Vec<_>, Vec<_>) = chain
647            .blocks_and_receipts()
648            .map(|(block, receipts)| {
649                let block_receipts =
650                    BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
651                (block.clone(), block_receipts)
652            })
653            .unzip();
654        Self { blocks, receipts }
655    }
656}
657
658/// Identifier for the caches.
659#[derive(Copy, Clone, Debug)]
660enum CacheKind {
661    Block,
662    Receipt,
663    Header,
664}
665
666/// Drop aware sender struct that ensures a response is always emitted even if the db task panics
667/// before a result could be sent.
668///
669/// This type wraps a sender and in case the sender is still present on drop emit an error response.
670#[derive(Debug)]
671struct ActionSender<B: Block, R: Send + Sync> {
672    kind: CacheKind,
673    blockhash: B256,
674    tx: Option<UnboundedSender<CacheAction<B, R>>>,
675}
676
677impl<R: Send + Sync, B: Block> ActionSender<B, R> {
678    const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
679        Self { kind, blockhash, tx: Some(tx) }
680    }
681
682    fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
683        if let Some(tx) = self.tx.take() {
684            let _ = tx.send(CacheAction::BlockWithSendersResult {
685                block_hash: self.blockhash,
686                res: block_sender,
687            });
688        }
689    }
690
691    fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
692        if let Some(tx) = self.tx.take() {
693            let _ =
694                tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
695        }
696    }
697
698    fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
699        if let Some(tx) = self.tx.take() {
700            let _ = tx.send(CacheAction::HeaderResult {
701                block_hash: self.blockhash,
702                res: Box::new(header),
703            });
704        }
705    }
706}
707impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
708    fn drop(&mut self) {
709        if let Some(tx) = self.tx.take() {
710            let msg = match self.kind {
711                CacheKind::Block => CacheAction::BlockWithSendersResult {
712                    block_hash: self.blockhash,
713                    res: Err(ProviderError::CacheServiceUnavailable),
714                },
715                CacheKind::Receipt => CacheAction::ReceiptsResult {
716                    block_hash: self.blockhash,
717                    res: Err(ProviderError::CacheServiceUnavailable),
718                },
719                CacheKind::Header => CacheAction::HeaderResult {
720                    block_hash: self.blockhash,
721                    res: Box::new(Err(ProviderError::CacheServiceUnavailable)),
722                },
723            };
724            let _ = tx.send(msg);
725        }
726    }
727}
728
729/// Awaits for new chain events and directly inserts them into the cache so they're available
730/// immediately before they need to be fetched from disk.
731///
732/// Reorged blocks are removed from the cache.
733pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
734    eth_state_cache: EthStateCache<N::Block, N::Receipt>,
735    mut events: St,
736) where
737    St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
738{
739    while let Some(event) = events.next().await {
740        if let Some(reverted) = event.reverted() {
741            let chain_change = ChainChange::new(reverted);
742
743            let _ =
744                eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
745        }
746
747        let chain_change = ChainChange::new(event.committed());
748
749        let _ =
750            eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
751    }
752}