1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{TxHash, B256};
8use futures::{stream::FuturesOrdered, Stream, StreamExt};
9use reth_chain_state::CanonStateNotification;
10use reth_errors::{ProviderError, ProviderResult};
11use reth_execution_types::Chain;
12use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
13use reth_storage_api::{BlockReader, TransactionVariant};
14use reth_tasks::Runtime;
15use schnellru::{ByLength, Limiter, LruMap};
16use std::{
17 future::Future,
18 pin::Pin,
19 sync::Arc,
20 task::{Context, Poll},
21};
22use tokio::sync::{
23 mpsc::{unbounded_channel, UnboundedSender},
24 oneshot, Semaphore,
25};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28pub mod config;
29pub mod db;
30pub mod metrics;
31pub mod multi_consumer;
32
33type BlockWithSendersResponseSender<B> =
35 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
36
37type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
39
40type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
41
42type CachedBlockAndReceiptsResponseSender<B, R> =
43 oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
44
45type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
47
48type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
50
51type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
53
54type BlockLruCache<B, L> =
55 MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
56
57type ReceiptsLruCache<R, L> =
58 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
59
60type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
61
62#[derive(Debug)]
67pub struct EthStateCache<N: NodePrimitives> {
68 to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
69}
70
71impl<N: NodePrimitives> Clone for EthStateCache<N> {
72 fn clone(&self) -> Self {
73 Self { to_service: self.to_service.clone() }
74 }
75}
76
77impl<N: NodePrimitives> EthStateCache<N> {
78 fn create<Provider>(
80 provider: Provider,
81 action_task_spawner: Runtime,
82 max_blocks: u32,
83 max_receipts: u32,
84 max_headers: u32,
85 max_concurrent_db_operations: usize,
86 max_cached_tx_hashes: u32,
87 ) -> (Self, EthStateCacheService<Provider, Runtime>)
88 where
89 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
90 {
91 let (to_service, rx) = unbounded_channel();
92
93 let service = EthStateCacheService {
94 provider,
95 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
96 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
97 headers_cache: HeaderLruCache::new(max_headers, "headers"),
98 action_tx: to_service.clone(),
99 action_rx: UnboundedReceiverStream::new(rx),
100 action_task_spawner,
101 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
102 tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
103 };
104 let cache = Self { to_service };
105 (cache, service)
106 }
107
108 pub fn spawn_with<Provider>(
113 provider: Provider,
114 config: EthStateCacheConfig,
115 executor: Runtime,
116 ) -> Self
117 where
118 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
119 {
120 let EthStateCacheConfig {
121 max_blocks,
122 max_receipts,
123 max_headers,
124 max_concurrent_db_requests,
125 max_cached_tx_hashes,
126 } = config;
127 let (this, service) = Self::create(
128 provider,
129 executor.clone(),
130 max_blocks,
131 max_receipts,
132 max_headers,
133 max_concurrent_db_requests,
134 max_cached_tx_hashes,
135 );
136 executor.spawn_critical_task("eth state cache", service);
137 this
138 }
139
140 pub async fn get_recovered_block(
144 &self,
145 block_hash: B256,
146 ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
147 let (response_tx, rx) = oneshot::channel();
148 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
149 rx.await.map_err(|_| CacheServiceUnavailable)?
150 }
151
152 pub async fn get_receipts(
156 &self,
157 block_hash: B256,
158 ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
159 let (response_tx, rx) = oneshot::channel();
160 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
161 rx.await.map_err(|_| CacheServiceUnavailable)?
162 }
163
164 pub async fn get_block_and_receipts(
166 &self,
167 block_hash: B256,
168 ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
169 let block = self.get_recovered_block(block_hash);
170 let receipts = self.get_receipts(block_hash);
171
172 let (block, receipts) = futures::try_join!(block, receipts)?;
173
174 Ok(block.zip(receipts))
175 }
176
177 pub async fn get_receipts_and_maybe_block(
179 &self,
180 block_hash: B256,
181 ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
182 let (response_tx, rx) = oneshot::channel();
183 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
184
185 let receipts = self.get_receipts(block_hash);
186
187 let (receipts, block) = futures::join!(receipts, rx);
188
189 let block = block.map_err(|_| CacheServiceUnavailable)?;
190 Ok(receipts?.map(|r| (r, block)))
191 }
192
193 pub async fn maybe_cached_block_and_receipts(
195 &self,
196 block_hash: B256,
197 ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
198 let (response_tx, rx) = oneshot::channel();
199 let _ = self
200 .to_service
201 .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
202 rx.await.map_err(|_| CacheServiceUnavailable.into())
203 }
204
205 #[allow(clippy::type_complexity)]
207 pub fn get_receipts_and_maybe_block_stream<'a>(
208 &'a self,
209 hashes: Vec<B256>,
210 ) -> impl Stream<
211 Item = ProviderResult<
212 Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
213 >,
214 > + 'a {
215 let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
216
217 futures.collect::<FuturesOrdered<_>>()
218 }
219
220 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
224 let (response_tx, rx) = oneshot::channel();
225 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
226 rx.await.map_err(|_| CacheServiceUnavailable)?
227 }
228
229 pub async fn get_cached_parent_blocks(
237 &self,
238 block_hash: B256,
239 max_blocks: usize,
240 ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
241 let (response_tx, rx) = oneshot::channel();
242 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
243 block_hash,
244 max_blocks,
245 response_tx,
246 });
247
248 let blocks = rx.await.unwrap_or_default();
249 if blocks.is_empty() {
250 None
251 } else {
252 Some(blocks)
253 }
254 }
255
256 pub async fn get_transaction_by_hash(
261 &self,
262 tx_hash: TxHash,
263 ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
264 let (response_tx, rx) = oneshot::channel();
265 let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
266 rx.await.ok()?
267 }
268}
269#[derive(Debug, thiserror::Error)]
271#[error("cache service task stopped")]
272pub struct CacheServiceUnavailable;
273
274impl From<CacheServiceUnavailable> for ProviderError {
275 fn from(err: CacheServiceUnavailable) -> Self {
276 Self::other(err)
277 }
278}
279
280#[must_use = "Type does nothing unless spawned"]
297pub(crate) struct EthStateCacheService<
298 Provider,
299 Tasks,
300 LimitBlocks = ByLength,
301 LimitReceipts = ByLength,
302 LimitHeaders = ByLength,
303> where
304 Provider: BlockReader,
305 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
306 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
307 LimitHeaders: Limiter<B256, Provider::Header>,
308{
309 provider: Provider,
311 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
313 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
315 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
320 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
322 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
324 action_task_spawner: Tasks,
326 rate_limiter: Arc<Semaphore>,
330 tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
332}
333
334impl<Provider> EthStateCacheService<Provider, Runtime>
335where
336 Provider: BlockReader + Clone + Unpin + 'static,
337{
338 fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
340 let block_hash = block.hash();
341 for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
342 self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
343 }
344 }
345
346 fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
351 let block_hash = block.hash();
352 for tx in block.body().transactions() {
353 if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
354 *mapped_hash == block_hash
355 {
356 self.tx_hash_index.remove(tx.tx_hash());
357 }
358 }
359 }
360
361 fn on_new_block(
362 &mut self,
363 block_hash: B256,
364 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
365 ) {
366 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
367 for tx in queued {
369 let _ = tx.send(res.clone());
370 }
371 }
372
373 if let Ok(Some(block)) = res {
375 self.full_block_cache.insert(block_hash, block);
376 }
377 }
378
379 fn on_new_receipts(
380 &mut self,
381 block_hash: B256,
382 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
383 ) {
384 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
385 for tx in queued {
387 let _ = tx.send(res.clone());
388 }
389 }
390
391 if let Ok(Some(receipts)) = res {
393 self.receipts_cache.insert(block_hash, receipts);
394 }
395 }
396
397 fn on_reorg_block(
398 &mut self,
399 block_hash: B256,
400 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
401 ) {
402 let res = res.map(|b| b.map(Arc::new));
403 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
404 for tx in queued {
406 let _ = tx.send(res.clone());
407 }
408 }
409 }
410
411 fn on_reorg_receipts(
412 &mut self,
413 block_hash: B256,
414 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
415 ) {
416 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
417 for tx in queued {
419 let _ = tx.send(res.clone());
420 }
421 }
422 }
423
424 fn shrink_queues(&mut self) {
426 let min_capacity = 2;
427 self.full_block_cache.shrink_to(min_capacity);
428 self.receipts_cache.shrink_to(min_capacity);
429 self.headers_cache.shrink_to(min_capacity);
430 }
431
432 fn update_cached_metrics(&self) {
433 self.full_block_cache.update_cached_metrics();
434 self.receipts_cache.update_cached_metrics();
435 self.headers_cache.update_cached_metrics();
436 }
437}
438
439impl<Provider> Future for EthStateCacheService<Provider, Runtime>
440where
441 Provider: BlockReader + Clone + Unpin + 'static,
442{
443 type Output = ();
444
445 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
446 let this = self.get_mut();
447
448 loop {
449 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
450 this.shrink_queues();
452 return Poll::Pending;
453 };
454
455 match action {
456 None => {
457 unreachable!("can't close")
458 }
459 Some(action) => {
460 match action {
461 CacheAction::GetCachedBlock { block_hash, response_tx } => {
462 let _ =
463 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
464 }
465 CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
466 let block = this.full_block_cache.get(&block_hash).cloned();
467 let receipts = this.receipts_cache.get(&block_hash).cloned();
468 let _ = response_tx.send((block, receipts));
469 }
470 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
471 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
472 let _ = response_tx.send(Ok(Some(block)));
473 continue
474 }
475
476 if this.full_block_cache.queue(block_hash, response_tx) {
478 let provider = this.provider.clone();
479 let action_tx = this.action_tx.clone();
480 let rate_limiter = this.rate_limiter.clone();
481 let mut action_sender =
482 ActionSender::new(CacheKind::Block, block_hash, action_tx);
483 this.action_task_spawner.spawn_blocking_task(async move {
484 let _permit = rate_limiter.acquire().await;
486 let block_sender = provider
489 .sealed_block_with_senders(
490 BlockHashOrNumber::Hash(block_hash),
491 TransactionVariant::WithHash,
492 )
493 .map(|maybe_block| maybe_block.map(Arc::new));
494 action_sender.send_block(block_sender);
495 });
496 }
497 }
498 CacheAction::GetReceipts { block_hash, response_tx } => {
499 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
501 let _ = response_tx.send(Ok(Some(receipts)));
502 continue
503 }
504
505 if this.receipts_cache.queue(block_hash, response_tx) {
507 let provider = this.provider.clone();
508 let action_tx = this.action_tx.clone();
509 let rate_limiter = this.rate_limiter.clone();
510 let mut action_sender =
511 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
512 this.action_task_spawner.spawn_blocking_task(async move {
513 let _permit = rate_limiter.acquire().await;
515 let res = provider
516 .receipts_by_block(block_hash.into())
517 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
518
519 action_sender.send_receipts(res);
520 });
521 }
522 }
523 CacheAction::GetHeader { block_hash, response_tx } => {
524 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
526 let _ = response_tx.send(Ok(header));
527 continue
528 }
529
530 if let Some(block) = this.full_block_cache.get(&block_hash) {
532 let _ = response_tx.send(Ok(block.clone_header()));
533 continue
534 }
535
536 if this.headers_cache.queue(block_hash, response_tx) {
539 let provider = this.provider.clone();
540 let action_tx = this.action_tx.clone();
541 let rate_limiter = this.rate_limiter.clone();
542 let mut action_sender =
543 ActionSender::new(CacheKind::Header, block_hash, action_tx);
544 this.action_task_spawner.spawn_blocking_task(async move {
545 let _permit = rate_limiter.acquire().await;
547 let header = provider.header(block_hash).and_then(|header| {
548 header.ok_or_else(|| {
549 ProviderError::HeaderNotFound(block_hash.into())
550 })
551 });
552 action_sender.send_header(header);
553 });
554 }
555 }
556 CacheAction::ReceiptsResult { block_hash, res } => {
557 this.on_new_receipts(block_hash, res);
558 }
559 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
560 Ok(Some(block_with_senders)) => {
561 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
562 }
563 Ok(None) => {
564 this.on_new_block(block_hash, Ok(None));
565 }
566 Err(e) => {
567 this.on_new_block(block_hash, Err(e));
568 }
569 },
570 CacheAction::HeaderResult { block_hash, res } => {
571 let res = *res;
572 if let Some(queued) = this.headers_cache.remove(&block_hash) {
573 for tx in queued {
575 let _ = tx.send(res.clone());
576 }
577 }
578
579 if let Ok(data) = res {
581 this.headers_cache.insert(block_hash, data);
582 }
583 }
584 CacheAction::CacheNewCanonicalChain { chain_change } => {
585 for block in chain_change.blocks {
586 this.index_block_transactions(&block);
588 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
589 }
590
591 for block_receipts in chain_change.receipts {
592 this.on_new_receipts(
593 block_receipts.block_hash,
594 Ok(Some(Arc::new(block_receipts.receipts))),
595 );
596 }
597 }
598 CacheAction::RemoveReorgedChain { chain_change } => {
599 for block in chain_change.blocks {
600 this.remove_block_transactions(&block);
602 this.on_reorg_block(block.hash(), Ok(Some(block)));
603 }
604
605 for block_receipts in chain_change.receipts {
606 this.on_reorg_receipts(
607 block_receipts.block_hash,
608 Ok(Some(Arc::new(block_receipts.receipts))),
609 );
610 }
611 }
612 CacheAction::GetCachedParentBlocks {
613 block_hash,
614 max_blocks,
615 response_tx,
616 } => {
617 let mut blocks = Vec::new();
618 let mut current_hash = block_hash;
619
620 while blocks.len() < max_blocks {
622 if let Some(block) =
623 this.full_block_cache.get(¤t_hash).cloned()
624 {
625 current_hash = block.header().parent_hash();
627 blocks.push(block);
628 } else {
629 break;
631 }
632 }
633
634 let _ = response_tx.send(blocks);
635 }
636 CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
637 let result =
638 this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
639 let block = this.full_block_cache.get(block_hash).cloned()?;
640 let receipts = this.receipts_cache.get(block_hash).cloned();
641 Some(CachedTransaction::new(block, *idx, receipts))
642 });
643 let _ = response_tx.send(result);
644 }
645 };
646 this.update_cached_metrics();
647 }
648 }
649 }
650 }
651}
652
653enum CacheAction<B: Block, R> {
655 GetBlockWithSenders {
656 block_hash: B256,
657 response_tx: BlockWithSendersResponseSender<B>,
658 },
659 GetHeader {
660 block_hash: B256,
661 response_tx: HeaderResponseSender<B::Header>,
662 },
663 GetReceipts {
664 block_hash: B256,
665 response_tx: ReceiptsResponseSender<R>,
666 },
667 GetCachedBlock {
668 block_hash: B256,
669 response_tx: CachedBlockResponseSender<B>,
670 },
671 GetCachedBlockAndReceipts {
672 block_hash: B256,
673 response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
674 },
675 BlockWithSendersResult {
676 block_hash: B256,
677 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
678 },
679 ReceiptsResult {
680 block_hash: B256,
681 res: ProviderResult<Option<Arc<Vec<R>>>>,
682 },
683 HeaderResult {
684 block_hash: B256,
685 res: Box<ProviderResult<B::Header>>,
686 },
687 CacheNewCanonicalChain {
688 chain_change: ChainChange<B, R>,
689 },
690 RemoveReorgedChain {
691 chain_change: ChainChange<B, R>,
692 },
693 GetCachedParentBlocks {
694 block_hash: B256,
695 max_blocks: usize,
696 response_tx: CachedParentBlocksResponseSender<B>,
697 },
698 GetTransactionByHash {
700 tx_hash: TxHash,
701 response_tx: TransactionHashResponseSender<B, R>,
702 },
703}
704
705struct BlockReceipts<R> {
706 block_hash: B256,
707 receipts: Vec<R>,
708}
709
710struct ChainChange<B: Block, R> {
712 blocks: Vec<RecoveredBlock<B>>,
713 receipts: Vec<BlockReceipts<R>>,
714}
715
716impl<B: Block, R: Clone> ChainChange<B, R> {
717 fn new<N>(chain: Arc<Chain<N>>) -> Self
718 where
719 N: NodePrimitives<Block = B, Receipt = R>,
720 {
721 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
722 .blocks_and_receipts()
723 .map(|(block, receipts)| {
724 let block_receipts =
725 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
726 (block.clone(), block_receipts)
727 })
728 .unzip();
729 Self { blocks, receipts }
730 }
731}
732
733#[derive(Copy, Clone, Debug)]
735enum CacheKind {
736 Block,
737 Receipt,
738 Header,
739}
740
741#[derive(Debug)]
746struct ActionSender<B: Block, R: Send + Sync> {
747 kind: CacheKind,
748 blockhash: B256,
749 tx: Option<UnboundedSender<CacheAction<B, R>>>,
750}
751
752impl<R: Send + Sync, B: Block> ActionSender<B, R> {
753 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
754 Self { kind, blockhash, tx: Some(tx) }
755 }
756
757 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
758 if let Some(tx) = self.tx.take() {
759 let _ = tx.send(CacheAction::BlockWithSendersResult {
760 block_hash: self.blockhash,
761 res: block_sender,
762 });
763 }
764 }
765
766 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
767 if let Some(tx) = self.tx.take() {
768 let _ =
769 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
770 }
771 }
772
773 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
774 if let Some(tx) = self.tx.take() {
775 let _ = tx.send(CacheAction::HeaderResult {
776 block_hash: self.blockhash,
777 res: Box::new(header),
778 });
779 }
780 }
781}
782impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
783 fn drop(&mut self) {
784 if let Some(tx) = self.tx.take() {
785 let msg = match self.kind {
786 CacheKind::Block => CacheAction::BlockWithSendersResult {
787 block_hash: self.blockhash,
788 res: Err(CacheServiceUnavailable.into()),
789 },
790 CacheKind::Receipt => CacheAction::ReceiptsResult {
791 block_hash: self.blockhash,
792 res: Err(CacheServiceUnavailable.into()),
793 },
794 CacheKind::Header => CacheAction::HeaderResult {
795 block_hash: self.blockhash,
796 res: Box::new(Err(CacheServiceUnavailable.into())),
797 },
798 };
799 let _ = tx.send(msg);
800 }
801 }
802}
803
804pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
809 eth_state_cache: EthStateCache<N>,
810 mut events: St,
811) where
812 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
813{
814 while let Some(event) = events.next().await {
815 if let Some(reverted) = event.reverted() {
816 let chain_change = ChainChange::new(reverted);
817
818 let _ =
819 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
820 }
821
822 let chain_change = ChainChange::new(event.committed());
823
824 let _ =
825 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
826 }
827}