1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use crate::block::CachedTransaction;
5use alloy_consensus::{transaction::TxHashRef, BlockHeader};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{TxHash, B256};
8use futures::{stream::FuturesOrdered, Stream, StreamExt};
9use reth_chain_state::CanonStateNotification;
10use reth_errors::{ProviderError, ProviderResult};
11use reth_execution_types::Chain;
12use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
13use reth_storage_api::{BlockReader, TransactionVariant};
14use reth_tasks::{TaskSpawner, TokioTaskExecutor};
15use schnellru::{ByLength, Limiter, LruMap};
16use std::{
17 future::Future,
18 pin::Pin,
19 sync::Arc,
20 task::{Context, Poll},
21};
22use tokio::sync::{
23 mpsc::{unbounded_channel, UnboundedSender},
24 oneshot, Semaphore,
25};
26use tokio_stream::wrappers::UnboundedReceiverStream;
27
28pub mod config;
29pub mod db;
30pub mod metrics;
31pub mod multi_consumer;
32
33type BlockWithSendersResponseSender<B> =
35 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
36
37type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
39
40type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
41
42type CachedBlockAndReceiptsResponseSender<B, R> =
43 oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
44
45type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
47
48type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
50
51type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;
53
54type BlockLruCache<B, L> =
55 MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;
56
57type ReceiptsLruCache<R, L> =
58 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
59
60type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
61
62#[derive(Debug)]
67pub struct EthStateCache<N: NodePrimitives> {
68 to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
69}
70
71impl<N: NodePrimitives> Clone for EthStateCache<N> {
72 fn clone(&self) -> Self {
73 Self { to_service: self.to_service.clone() }
74 }
75}
76
77impl<N: NodePrimitives> EthStateCache<N> {
78 fn create<Provider, Tasks>(
80 provider: Provider,
81 action_task_spawner: Tasks,
82 max_blocks: u32,
83 max_receipts: u32,
84 max_headers: u32,
85 max_concurrent_db_operations: usize,
86 max_cached_tx_hashes: u32,
87 ) -> (Self, EthStateCacheService<Provider, Tasks>)
88 where
89 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
90 {
91 let (to_service, rx) = unbounded_channel();
92
93 let service = EthStateCacheService {
94 provider,
95 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
96 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
97 headers_cache: HeaderLruCache::new(max_headers, "headers"),
98 action_tx: to_service.clone(),
99 action_rx: UnboundedReceiverStream::new(rx),
100 action_task_spawner,
101 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
102 tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
103 };
104 let cache = Self { to_service };
105 (cache, service)
106 }
107
108 pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
113 where
114 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
115 {
116 Self::spawn_with(provider, config, TokioTaskExecutor::default())
117 }
118
119 pub fn spawn_with<Provider, Tasks>(
124 provider: Provider,
125 config: EthStateCacheConfig,
126 executor: Tasks,
127 ) -> Self
128 where
129 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
130 Tasks: TaskSpawner + Clone + 'static,
131 {
132 let EthStateCacheConfig {
133 max_blocks,
134 max_receipts,
135 max_headers,
136 max_concurrent_db_requests,
137 max_cached_tx_hashes,
138 } = config;
139 let (this, service) = Self::create(
140 provider,
141 executor.clone(),
142 max_blocks,
143 max_receipts,
144 max_headers,
145 max_concurrent_db_requests,
146 max_cached_tx_hashes,
147 );
148 executor.spawn_critical_task("eth state cache", Box::pin(service));
149 this
150 }
151
152 pub async fn get_recovered_block(
156 &self,
157 block_hash: B256,
158 ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
159 let (response_tx, rx) = oneshot::channel();
160 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
161 rx.await.map_err(|_| CacheServiceUnavailable)?
162 }
163
164 pub async fn get_receipts(
168 &self,
169 block_hash: B256,
170 ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
171 let (response_tx, rx) = oneshot::channel();
172 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
173 rx.await.map_err(|_| CacheServiceUnavailable)?
174 }
175
176 pub async fn get_block_and_receipts(
178 &self,
179 block_hash: B256,
180 ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
181 let block = self.get_recovered_block(block_hash);
182 let receipts = self.get_receipts(block_hash);
183
184 let (block, receipts) = futures::try_join!(block, receipts)?;
185
186 Ok(block.zip(receipts))
187 }
188
189 pub async fn get_receipts_and_maybe_block(
191 &self,
192 block_hash: B256,
193 ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
194 let (response_tx, rx) = oneshot::channel();
195 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
196
197 let receipts = self.get_receipts(block_hash);
198
199 let (receipts, block) = futures::join!(receipts, rx);
200
201 let block = block.map_err(|_| CacheServiceUnavailable)?;
202 Ok(receipts?.map(|r| (r, block)))
203 }
204
205 pub async fn maybe_cached_block_and_receipts(
207 &self,
208 block_hash: B256,
209 ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
210 let (response_tx, rx) = oneshot::channel();
211 let _ = self
212 .to_service
213 .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
214 rx.await.map_err(|_| CacheServiceUnavailable.into())
215 }
216
217 #[allow(clippy::type_complexity)]
219 pub fn get_receipts_and_maybe_block_stream<'a>(
220 &'a self,
221 hashes: Vec<B256>,
222 ) -> impl Stream<
223 Item = ProviderResult<
224 Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
225 >,
226 > + 'a {
227 let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
228
229 futures.collect::<FuturesOrdered<_>>()
230 }
231
232 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
236 let (response_tx, rx) = oneshot::channel();
237 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
238 rx.await.map_err(|_| CacheServiceUnavailable)?
239 }
240
241 pub async fn get_cached_parent_blocks(
249 &self,
250 block_hash: B256,
251 max_blocks: usize,
252 ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
253 let (response_tx, rx) = oneshot::channel();
254 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
255 block_hash,
256 max_blocks,
257 response_tx,
258 });
259
260 let blocks = rx.await.unwrap_or_default();
261 if blocks.is_empty() {
262 None
263 } else {
264 Some(blocks)
265 }
266 }
267
268 pub async fn get_transaction_by_hash(
273 &self,
274 tx_hash: TxHash,
275 ) -> Option<CachedTransaction<N::Block, N::Receipt>> {
276 let (response_tx, rx) = oneshot::channel();
277 let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
278 rx.await.ok()?
279 }
280}
281#[derive(Debug, thiserror::Error)]
283#[error("cache service task stopped")]
284pub struct CacheServiceUnavailable;
285
286impl From<CacheServiceUnavailable> for ProviderError {
287 fn from(err: CacheServiceUnavailable) -> Self {
288 Self::other(err)
289 }
290}
291
292#[must_use = "Type does nothing unless spawned"]
309pub(crate) struct EthStateCacheService<
310 Provider,
311 Tasks,
312 LimitBlocks = ByLength,
313 LimitReceipts = ByLength,
314 LimitHeaders = ByLength,
315> where
316 Provider: BlockReader,
317 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
318 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
319 LimitHeaders: Limiter<B256, Provider::Header>,
320{
321 provider: Provider,
323 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
325 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
327 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
332 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
334 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
336 action_task_spawner: Tasks,
338 rate_limiter: Arc<Semaphore>,
342 tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
344}
345
346impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
347where
348 Provider: BlockReader + Clone + Unpin + 'static,
349 Tasks: TaskSpawner + Clone + 'static,
350{
351 fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
353 let block_hash = block.hash();
354 for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
355 self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
356 }
357 }
358
359 fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
364 let block_hash = block.hash();
365 for tx in block.body().transactions() {
366 if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
367 *mapped_hash == block_hash
368 {
369 self.tx_hash_index.remove(tx.tx_hash());
370 }
371 }
372 }
373
374 fn on_new_block(
375 &mut self,
376 block_hash: B256,
377 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
378 ) {
379 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
380 for tx in queued {
382 let _ = tx.send(res.clone());
383 }
384 }
385
386 if let Ok(Some(block)) = res {
388 self.full_block_cache.insert(block_hash, block);
389 }
390 }
391
392 fn on_new_receipts(
393 &mut self,
394 block_hash: B256,
395 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
396 ) {
397 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
398 for tx in queued {
400 let _ = tx.send(res.clone());
401 }
402 }
403
404 if let Ok(Some(receipts)) = res {
406 self.receipts_cache.insert(block_hash, receipts);
407 }
408 }
409
410 fn on_reorg_block(
411 &mut self,
412 block_hash: B256,
413 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
414 ) {
415 let res = res.map(|b| b.map(Arc::new));
416 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
417 for tx in queued {
419 let _ = tx.send(res.clone());
420 }
421 }
422 }
423
424 fn on_reorg_receipts(
425 &mut self,
426 block_hash: B256,
427 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
428 ) {
429 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
430 for tx in queued {
432 let _ = tx.send(res.clone());
433 }
434 }
435 }
436
437 fn shrink_queues(&mut self) {
439 let min_capacity = 2;
440 self.full_block_cache.shrink_to(min_capacity);
441 self.receipts_cache.shrink_to(min_capacity);
442 self.headers_cache.shrink_to(min_capacity);
443 }
444
445 fn update_cached_metrics(&self) {
446 self.full_block_cache.update_cached_metrics();
447 self.receipts_cache.update_cached_metrics();
448 self.headers_cache.update_cached_metrics();
449 }
450}
451
452impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
453where
454 Provider: BlockReader + Clone + Unpin + 'static,
455 Tasks: TaskSpawner + Clone + 'static,
456{
457 type Output = ();
458
459 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
460 let this = self.get_mut();
461
462 loop {
463 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
464 this.shrink_queues();
466 return Poll::Pending;
467 };
468
469 match action {
470 None => {
471 unreachable!("can't close")
472 }
473 Some(action) => {
474 match action {
475 CacheAction::GetCachedBlock { block_hash, response_tx } => {
476 let _ =
477 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
478 }
479 CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
480 let block = this.full_block_cache.get(&block_hash).cloned();
481 let receipts = this.receipts_cache.get(&block_hash).cloned();
482 let _ = response_tx.send((block, receipts));
483 }
484 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
485 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
486 let _ = response_tx.send(Ok(Some(block)));
487 continue
488 }
489
490 if this.full_block_cache.queue(block_hash, response_tx) {
492 let provider = this.provider.clone();
493 let action_tx = this.action_tx.clone();
494 let rate_limiter = this.rate_limiter.clone();
495 let mut action_sender =
496 ActionSender::new(CacheKind::Block, block_hash, action_tx);
497 this.action_task_spawner.spawn_blocking_task(Box::pin(
498 async move {
499 let _permit = rate_limiter.acquire().await;
501 let block_sender = provider
504 .sealed_block_with_senders(
505 BlockHashOrNumber::Hash(block_hash),
506 TransactionVariant::WithHash,
507 )
508 .map(|maybe_block| maybe_block.map(Arc::new));
509 action_sender.send_block(block_sender);
510 },
511 ));
512 }
513 }
514 CacheAction::GetReceipts { block_hash, response_tx } => {
515 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
517 let _ = response_tx.send(Ok(Some(receipts)));
518 continue
519 }
520
521 if this.receipts_cache.queue(block_hash, response_tx) {
523 let provider = this.provider.clone();
524 let action_tx = this.action_tx.clone();
525 let rate_limiter = this.rate_limiter.clone();
526 let mut action_sender =
527 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
528 this.action_task_spawner.spawn_blocking_task(Box::pin(
529 async move {
530 let _permit = rate_limiter.acquire().await;
532 let res = provider
533 .receipts_by_block(block_hash.into())
534 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
535
536 action_sender.send_receipts(res);
537 },
538 ));
539 }
540 }
541 CacheAction::GetHeader { block_hash, response_tx } => {
542 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
544 let _ = response_tx.send(Ok(header));
545 continue
546 }
547
548 if let Some(block) = this.full_block_cache.get(&block_hash) {
550 let _ = response_tx.send(Ok(block.clone_header()));
551 continue
552 }
553
554 if this.headers_cache.queue(block_hash, response_tx) {
557 let provider = this.provider.clone();
558 let action_tx = this.action_tx.clone();
559 let rate_limiter = this.rate_limiter.clone();
560 let mut action_sender =
561 ActionSender::new(CacheKind::Header, block_hash, action_tx);
562 this.action_task_spawner.spawn_blocking_task(Box::pin(
563 async move {
564 let _permit = rate_limiter.acquire().await;
566 let header =
567 provider.header(block_hash).and_then(|header| {
568 header.ok_or_else(|| {
569 ProviderError::HeaderNotFound(block_hash.into())
570 })
571 });
572 action_sender.send_header(header);
573 },
574 ));
575 }
576 }
577 CacheAction::ReceiptsResult { block_hash, res } => {
578 this.on_new_receipts(block_hash, res);
579 }
580 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
581 Ok(Some(block_with_senders)) => {
582 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
583 }
584 Ok(None) => {
585 this.on_new_block(block_hash, Ok(None));
586 }
587 Err(e) => {
588 this.on_new_block(block_hash, Err(e));
589 }
590 },
591 CacheAction::HeaderResult { block_hash, res } => {
592 let res = *res;
593 if let Some(queued) = this.headers_cache.remove(&block_hash) {
594 for tx in queued {
596 let _ = tx.send(res.clone());
597 }
598 }
599
600 if let Ok(data) = res {
602 this.headers_cache.insert(block_hash, data);
603 }
604 }
605 CacheAction::CacheNewCanonicalChain { chain_change } => {
606 for block in chain_change.blocks {
607 this.index_block_transactions(&block);
609 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
610 }
611
612 for block_receipts in chain_change.receipts {
613 this.on_new_receipts(
614 block_receipts.block_hash,
615 Ok(Some(Arc::new(block_receipts.receipts))),
616 );
617 }
618 }
619 CacheAction::RemoveReorgedChain { chain_change } => {
620 for block in chain_change.blocks {
621 this.remove_block_transactions(&block);
623 this.on_reorg_block(block.hash(), Ok(Some(block)));
624 }
625
626 for block_receipts in chain_change.receipts {
627 this.on_reorg_receipts(
628 block_receipts.block_hash,
629 Ok(Some(Arc::new(block_receipts.receipts))),
630 );
631 }
632 }
633 CacheAction::GetCachedParentBlocks {
634 block_hash,
635 max_blocks,
636 response_tx,
637 } => {
638 let mut blocks = Vec::new();
639 let mut current_hash = block_hash;
640
641 while blocks.len() < max_blocks {
643 if let Some(block) =
644 this.full_block_cache.get(¤t_hash).cloned()
645 {
646 current_hash = block.header().parent_hash();
648 blocks.push(block);
649 } else {
650 break;
652 }
653 }
654
655 let _ = response_tx.send(blocks);
656 }
657 CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
658 let result =
659 this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
660 let block = this.full_block_cache.get(block_hash).cloned()?;
661 let receipts = this.receipts_cache.get(block_hash).cloned();
662 Some(CachedTransaction::new(block, *idx, receipts))
663 });
664 let _ = response_tx.send(result);
665 }
666 };
667 this.update_cached_metrics();
668 }
669 }
670 }
671 }
672}
673
674enum CacheAction<B: Block, R> {
676 GetBlockWithSenders {
677 block_hash: B256,
678 response_tx: BlockWithSendersResponseSender<B>,
679 },
680 GetHeader {
681 block_hash: B256,
682 response_tx: HeaderResponseSender<B::Header>,
683 },
684 GetReceipts {
685 block_hash: B256,
686 response_tx: ReceiptsResponseSender<R>,
687 },
688 GetCachedBlock {
689 block_hash: B256,
690 response_tx: CachedBlockResponseSender<B>,
691 },
692 GetCachedBlockAndReceipts {
693 block_hash: B256,
694 response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
695 },
696 BlockWithSendersResult {
697 block_hash: B256,
698 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
699 },
700 ReceiptsResult {
701 block_hash: B256,
702 res: ProviderResult<Option<Arc<Vec<R>>>>,
703 },
704 HeaderResult {
705 block_hash: B256,
706 res: Box<ProviderResult<B::Header>>,
707 },
708 CacheNewCanonicalChain {
709 chain_change: ChainChange<B, R>,
710 },
711 RemoveReorgedChain {
712 chain_change: ChainChange<B, R>,
713 },
714 GetCachedParentBlocks {
715 block_hash: B256,
716 max_blocks: usize,
717 response_tx: CachedParentBlocksResponseSender<B>,
718 },
719 GetTransactionByHash {
721 tx_hash: TxHash,
722 response_tx: TransactionHashResponseSender<B, R>,
723 },
724}
725
726struct BlockReceipts<R> {
727 block_hash: B256,
728 receipts: Vec<R>,
729}
730
731struct ChainChange<B: Block, R> {
733 blocks: Vec<RecoveredBlock<B>>,
734 receipts: Vec<BlockReceipts<R>>,
735}
736
737impl<B: Block, R: Clone> ChainChange<B, R> {
738 fn new<N>(chain: Arc<Chain<N>>) -> Self
739 where
740 N: NodePrimitives<Block = B, Receipt = R>,
741 {
742 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
743 .blocks_and_receipts()
744 .map(|(block, receipts)| {
745 let block_receipts =
746 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
747 (block.clone(), block_receipts)
748 })
749 .unzip();
750 Self { blocks, receipts }
751 }
752}
753
754#[derive(Copy, Clone, Debug)]
756enum CacheKind {
757 Block,
758 Receipt,
759 Header,
760}
761
762#[derive(Debug)]
767struct ActionSender<B: Block, R: Send + Sync> {
768 kind: CacheKind,
769 blockhash: B256,
770 tx: Option<UnboundedSender<CacheAction<B, R>>>,
771}
772
773impl<R: Send + Sync, B: Block> ActionSender<B, R> {
774 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
775 Self { kind, blockhash, tx: Some(tx) }
776 }
777
778 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
779 if let Some(tx) = self.tx.take() {
780 let _ = tx.send(CacheAction::BlockWithSendersResult {
781 block_hash: self.blockhash,
782 res: block_sender,
783 });
784 }
785 }
786
787 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
788 if let Some(tx) = self.tx.take() {
789 let _ =
790 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
791 }
792 }
793
794 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
795 if let Some(tx) = self.tx.take() {
796 let _ = tx.send(CacheAction::HeaderResult {
797 block_hash: self.blockhash,
798 res: Box::new(header),
799 });
800 }
801 }
802}
803impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
804 fn drop(&mut self) {
805 if let Some(tx) = self.tx.take() {
806 let msg = match self.kind {
807 CacheKind::Block => CacheAction::BlockWithSendersResult {
808 block_hash: self.blockhash,
809 res: Err(CacheServiceUnavailable.into()),
810 },
811 CacheKind::Receipt => CacheAction::ReceiptsResult {
812 block_hash: self.blockhash,
813 res: Err(CacheServiceUnavailable.into()),
814 },
815 CacheKind::Header => CacheAction::HeaderResult {
816 block_hash: self.blockhash,
817 res: Box::new(Err(CacheServiceUnavailable.into())),
818 },
819 };
820 let _ = tx.send(msg);
821 }
822 }
823}
824
825pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
830 eth_state_cache: EthStateCache<N>,
831 mut events: St,
832) where
833 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
834{
835 while let Some(event) = events.next().await {
836 if let Some(reverted) = event.reverted() {
837 let chain_change = ChainChange::new(reverted);
838
839 let _ =
840 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
841 }
842
843 let chain_change = ChainChange::new(event.committed());
844
845 let _ =
846 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
847 }
848}