1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockHashOrNumber;
6use alloy_primitives::B256;
7use futures::{future::Either, stream::FuturesOrdered, Stream, StreamExt};
8use reth_chain_state::CanonStateNotification;
9use reth_errors::{ProviderError, ProviderResult};
10use reth_execution_types::Chain;
11use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
12use reth_storage_api::{BlockReader, TransactionVariant};
13use reth_tasks::{TaskSpawner, TokioTaskExecutor};
14use schnellru::{ByLength, Limiter};
15use std::{
16 future::Future,
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20};
21use tokio::sync::{
22 mpsc::{unbounded_channel, UnboundedSender},
23 oneshot, Semaphore,
24};
25use tokio_stream::wrappers::UnboundedReceiverStream;
26
27pub mod config;
28pub mod db;
29pub mod metrics;
30pub mod multi_consumer;
31
32type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
34
35type BlockWithSendersResponseSender<B> =
37 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
38
39type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
41
42type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
43
44type CachedBlockAndReceiptsResponseSender<B, R> =
45 oneshot::Sender<(Option<Arc<RecoveredBlock<B>>>, Option<Arc<Vec<R>>>)>;
46
47type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
49
50type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
52
53type BlockLruCache<B, L> = MultiConsumerLruCache<
54 B256,
55 Arc<RecoveredBlock<B>>,
56 L,
57 Either<
58 BlockWithSendersResponseSender<B>,
59 BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
60 >,
61>;
62
63type ReceiptsLruCache<R, L> =
64 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
65
66type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
67
68#[derive(Debug)]
73pub struct EthStateCache<N: NodePrimitives> {
74 to_service: UnboundedSender<CacheAction<N::Block, N::Receipt>>,
75}
76
77impl<N: NodePrimitives> Clone for EthStateCache<N> {
78 fn clone(&self) -> Self {
79 Self { to_service: self.to_service.clone() }
80 }
81}
82
83impl<N: NodePrimitives> EthStateCache<N> {
84 fn create<Provider, Tasks>(
86 provider: Provider,
87 action_task_spawner: Tasks,
88 max_blocks: u32,
89 max_receipts: u32,
90 max_headers: u32,
91 max_concurrent_db_operations: usize,
92 ) -> (Self, EthStateCacheService<Provider, Tasks>)
93 where
94 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
95 {
96 let (to_service, rx) = unbounded_channel();
97 let service = EthStateCacheService {
98 provider,
99 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
100 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
101 headers_cache: HeaderLruCache::new(max_headers, "headers"),
102 action_tx: to_service.clone(),
103 action_rx: UnboundedReceiverStream::new(rx),
104 action_task_spawner,
105 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
106 };
107 let cache = Self { to_service };
108 (cache, service)
109 }
110
111 pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
116 where
117 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
118 {
119 Self::spawn_with(provider, config, TokioTaskExecutor::default())
120 }
121
122 pub fn spawn_with<Provider, Tasks>(
127 provider: Provider,
128 config: EthStateCacheConfig,
129 executor: Tasks,
130 ) -> Self
131 where
132 Provider: BlockReader<Block = N::Block, Receipt = N::Receipt> + Clone + Unpin + 'static,
133 Tasks: TaskSpawner + Clone + 'static,
134 {
135 let EthStateCacheConfig {
136 max_blocks,
137 max_receipts,
138 max_headers,
139 max_concurrent_db_requests,
140 } = config;
141 let (this, service) = Self::create(
142 provider,
143 executor.clone(),
144 max_blocks,
145 max_receipts,
146 max_headers,
147 max_concurrent_db_requests,
148 );
149 executor.spawn_critical("eth state cache", Box::pin(service));
150 this
151 }
152
153 pub async fn get_recovered_block(
157 &self,
158 block_hash: B256,
159 ) -> ProviderResult<Option<Arc<RecoveredBlock<N::Block>>>> {
160 let (response_tx, rx) = oneshot::channel();
161 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
162 rx.await.map_err(|_| CacheServiceUnavailable)?
163 }
164
165 pub async fn get_receipts(
169 &self,
170 block_hash: B256,
171 ) -> ProviderResult<Option<Arc<Vec<N::Receipt>>>> {
172 let (response_tx, rx) = oneshot::channel();
173 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
174 rx.await.map_err(|_| CacheServiceUnavailable)?
175 }
176
177 pub async fn get_block_and_receipts(
179 &self,
180 block_hash: B256,
181 ) -> ProviderResult<Option<(Arc<RecoveredBlock<N::Block>>, Arc<Vec<N::Receipt>>)>> {
182 let block = self.get_recovered_block(block_hash);
183 let receipts = self.get_receipts(block_hash);
184
185 let (block, receipts) = futures::try_join!(block, receipts)?;
186
187 Ok(block.zip(receipts))
188 }
189
190 pub async fn get_receipts_and_maybe_block(
192 &self,
193 block_hash: B256,
194 ) -> ProviderResult<Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>> {
195 let (response_tx, rx) = oneshot::channel();
196 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
197
198 let receipts = self.get_receipts(block_hash);
199
200 let (receipts, block) = futures::join!(receipts, rx);
201
202 let block = block.map_err(|_| CacheServiceUnavailable)?;
203 Ok(receipts?.map(|r| (r, block)))
204 }
205
206 pub async fn maybe_cached_block_and_receipts(
208 &self,
209 block_hash: B256,
210 ) -> ProviderResult<(Option<Arc<RecoveredBlock<N::Block>>>, Option<Arc<Vec<N::Receipt>>>)> {
211 let (response_tx, rx) = oneshot::channel();
212 let _ = self
213 .to_service
214 .send(CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx });
215 rx.await.map_err(|_| CacheServiceUnavailable.into())
216 }
217
218 #[allow(clippy::type_complexity)]
220 pub fn get_receipts_and_maybe_block_stream<'a>(
221 &'a self,
222 hashes: Vec<B256>,
223 ) -> impl Stream<
224 Item = ProviderResult<
225 Option<(Arc<Vec<N::Receipt>>, Option<Arc<RecoveredBlock<N::Block>>>)>,
226 >,
227 > + 'a {
228 let futures = hashes.into_iter().map(move |hash| self.get_receipts_and_maybe_block(hash));
229
230 futures.collect::<FuturesOrdered<_>>()
231 }
232
233 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<N::BlockHeader> {
237 let (response_tx, rx) = oneshot::channel();
238 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
239 rx.await.map_err(|_| CacheServiceUnavailable)?
240 }
241
242 pub async fn get_cached_parent_blocks(
250 &self,
251 block_hash: B256,
252 max_blocks: usize,
253 ) -> Option<Vec<Arc<RecoveredBlock<N::Block>>>> {
254 let (response_tx, rx) = oneshot::channel();
255 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
256 block_hash,
257 max_blocks,
258 response_tx,
259 });
260
261 let blocks = rx.await.unwrap_or_default();
262 if blocks.is_empty() {
263 None
264 } else {
265 Some(blocks)
266 }
267 }
268}
269#[derive(Debug, thiserror::Error)]
271#[error("cache service task stopped")]
272pub struct CacheServiceUnavailable;
273
274impl From<CacheServiceUnavailable> for ProviderError {
275 fn from(err: CacheServiceUnavailable) -> Self {
276 Self::other(err)
277 }
278}
279
280#[must_use = "Type does nothing unless spawned"]
297pub(crate) struct EthStateCacheService<
298 Provider,
299 Tasks,
300 LimitBlocks = ByLength,
301 LimitReceipts = ByLength,
302 LimitHeaders = ByLength,
303> where
304 Provider: BlockReader,
305 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
306 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
307 LimitHeaders: Limiter<B256, Provider::Header>,
308{
309 provider: Provider,
311 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
313 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
315 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
320 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
322 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
324 action_task_spawner: Tasks,
326 rate_limiter: Arc<Semaphore>,
330}
331
332impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
333where
334 Provider: BlockReader + Clone + Unpin + 'static,
335 Tasks: TaskSpawner + Clone + 'static,
336{
337 fn on_new_block(
338 &mut self,
339 block_hash: B256,
340 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
341 ) {
342 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
343 for tx in queued {
345 match tx {
346 Either::Left(block_with_senders) => {
347 let _ = block_with_senders.send(res.clone());
348 }
349 Either::Right(transaction_tx) => {
350 let _ = transaction_tx.send(res.clone().map(|maybe_block| {
351 maybe_block.map(|block| block.body().transactions().to_vec())
352 }));
353 }
354 }
355 }
356 }
357
358 if let Ok(Some(block)) = res {
360 self.full_block_cache.insert(block_hash, block);
361 }
362 }
363
364 fn on_new_receipts(
365 &mut self,
366 block_hash: B256,
367 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
368 ) {
369 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
370 for tx in queued {
372 let _ = tx.send(res.clone());
373 }
374 }
375
376 if let Ok(Some(receipts)) = res {
378 self.receipts_cache.insert(block_hash, receipts);
379 }
380 }
381
382 fn on_reorg_block(
383 &mut self,
384 block_hash: B256,
385 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
386 ) {
387 let res = res.map(|b| b.map(Arc::new));
388 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
389 for tx in queued {
391 match tx {
392 Either::Left(block_with_senders) => {
393 let _ = block_with_senders.send(res.clone());
394 }
395 Either::Right(transaction_tx) => {
396 let _ = transaction_tx.send(res.clone().map(|maybe_block| {
397 maybe_block.map(|block| block.body().transactions().to_vec())
398 }));
399 }
400 }
401 }
402 }
403 }
404
405 fn on_reorg_receipts(
406 &mut self,
407 block_hash: B256,
408 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
409 ) {
410 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
411 for tx in queued {
413 let _ = tx.send(res.clone());
414 }
415 }
416 }
417
418 fn shrink_queues(&mut self) {
420 let min_capacity = 2;
421 self.full_block_cache.shrink_to(min_capacity);
422 self.receipts_cache.shrink_to(min_capacity);
423 self.headers_cache.shrink_to(min_capacity);
424 }
425
426 fn update_cached_metrics(&self) {
427 self.full_block_cache.update_cached_metrics();
428 self.receipts_cache.update_cached_metrics();
429 self.headers_cache.update_cached_metrics();
430 }
431}
432
433impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
434where
435 Provider: BlockReader + Clone + Unpin + 'static,
436 Tasks: TaskSpawner + Clone + 'static,
437{
438 type Output = ();
439
440 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441 let this = self.get_mut();
442
443 loop {
444 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
445 this.shrink_queues();
447 return Poll::Pending;
448 };
449
450 match action {
451 None => {
452 unreachable!("can't close")
453 }
454 Some(action) => {
455 match action {
456 CacheAction::GetCachedBlock { block_hash, response_tx } => {
457 let _ =
458 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
459 }
460 CacheAction::GetCachedBlockAndReceipts { block_hash, response_tx } => {
461 let block = this.full_block_cache.get(&block_hash).cloned();
462 let receipts = this.receipts_cache.get(&block_hash).cloned();
463 let _ = response_tx.send((block, receipts));
464 }
465 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
466 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
467 let _ = response_tx.send(Ok(Some(block)));
468 continue
469 }
470
471 if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
473 let provider = this.provider.clone();
474 let action_tx = this.action_tx.clone();
475 let rate_limiter = this.rate_limiter.clone();
476 let mut action_sender =
477 ActionSender::new(CacheKind::Block, block_hash, action_tx);
478 this.action_task_spawner.spawn_blocking(Box::pin(async move {
479 let _permit = rate_limiter.acquire().await;
481 let block_sender = provider
484 .sealed_block_with_senders(
485 BlockHashOrNumber::Hash(block_hash),
486 TransactionVariant::WithHash,
487 )
488 .map(|maybe_block| maybe_block.map(Arc::new));
489 action_sender.send_block(block_sender);
490 }));
491 }
492 }
493 CacheAction::GetReceipts { block_hash, response_tx } => {
494 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
496 let _ = response_tx.send(Ok(Some(receipts)));
497 continue
498 }
499
500 if this.receipts_cache.queue(block_hash, response_tx) {
502 let provider = this.provider.clone();
503 let action_tx = this.action_tx.clone();
504 let rate_limiter = this.rate_limiter.clone();
505 let mut action_sender =
506 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
507 this.action_task_spawner.spawn_blocking(Box::pin(async move {
508 let _permit = rate_limiter.acquire().await;
510 let res = provider
511 .receipts_by_block(block_hash.into())
512 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
513
514 action_sender.send_receipts(res);
515 }));
516 }
517 }
518 CacheAction::GetHeader { block_hash, response_tx } => {
519 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
521 let _ = response_tx.send(Ok(header));
522 continue
523 }
524
525 if let Some(block) = this.full_block_cache.get(&block_hash) {
527 let _ = response_tx.send(Ok(block.clone_header()));
528 continue
529 }
530
531 if this.headers_cache.queue(block_hash, response_tx) {
534 let provider = this.provider.clone();
535 let action_tx = this.action_tx.clone();
536 let rate_limiter = this.rate_limiter.clone();
537 let mut action_sender =
538 ActionSender::new(CacheKind::Header, block_hash, action_tx);
539 this.action_task_spawner.spawn_blocking(Box::pin(async move {
540 let _permit = rate_limiter.acquire().await;
542 let header = provider.header(&block_hash).and_then(|header| {
543 header.ok_or_else(|| {
544 ProviderError::HeaderNotFound(block_hash.into())
545 })
546 });
547 action_sender.send_header(header);
548 }));
549 }
550 }
551 CacheAction::ReceiptsResult { block_hash, res } => {
552 this.on_new_receipts(block_hash, res);
553 }
554 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
555 Ok(Some(block_with_senders)) => {
556 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
557 }
558 Ok(None) => {
559 this.on_new_block(block_hash, Ok(None));
560 }
561 Err(e) => {
562 this.on_new_block(block_hash, Err(e));
563 }
564 },
565 CacheAction::HeaderResult { block_hash, res } => {
566 let res = *res;
567 if let Some(queued) = this.headers_cache.remove(&block_hash) {
568 for tx in queued {
570 let _ = tx.send(res.clone());
571 }
572 }
573
574 if let Ok(data) = res {
576 this.headers_cache.insert(block_hash, data);
577 }
578 }
579 CacheAction::CacheNewCanonicalChain { chain_change } => {
580 for block in chain_change.blocks {
581 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
582 }
583
584 for block_receipts in chain_change.receipts {
585 this.on_new_receipts(
586 block_receipts.block_hash,
587 Ok(Some(Arc::new(block_receipts.receipts))),
588 );
589 }
590 }
591 CacheAction::RemoveReorgedChain { chain_change } => {
592 for block in chain_change.blocks {
593 this.on_reorg_block(block.hash(), Ok(Some(block)));
594 }
595
596 for block_receipts in chain_change.receipts {
597 this.on_reorg_receipts(
598 block_receipts.block_hash,
599 Ok(Some(Arc::new(block_receipts.receipts))),
600 );
601 }
602 }
603 CacheAction::GetCachedParentBlocks {
604 block_hash,
605 max_blocks,
606 response_tx,
607 } => {
608 let mut blocks = Vec::new();
609 let mut current_hash = block_hash;
610
611 while blocks.len() < max_blocks {
613 if let Some(block) =
614 this.full_block_cache.get(¤t_hash).cloned()
615 {
616 current_hash = block.header().parent_hash();
618 blocks.push(block);
619 } else {
620 break;
622 }
623 }
624
625 let _ = response_tx.send(blocks);
626 }
627 };
628 this.update_cached_metrics();
629 }
630 }
631 }
632 }
633}
634
635enum CacheAction<B: Block, R> {
637 GetBlockWithSenders {
638 block_hash: B256,
639 response_tx: BlockWithSendersResponseSender<B>,
640 },
641 GetHeader {
642 block_hash: B256,
643 response_tx: HeaderResponseSender<B::Header>,
644 },
645 GetReceipts {
646 block_hash: B256,
647 response_tx: ReceiptsResponseSender<R>,
648 },
649 GetCachedBlock {
650 block_hash: B256,
651 response_tx: CachedBlockResponseSender<B>,
652 },
653 GetCachedBlockAndReceipts {
654 block_hash: B256,
655 response_tx: CachedBlockAndReceiptsResponseSender<B, R>,
656 },
657 BlockWithSendersResult {
658 block_hash: B256,
659 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
660 },
661 ReceiptsResult {
662 block_hash: B256,
663 res: ProviderResult<Option<Arc<Vec<R>>>>,
664 },
665 HeaderResult {
666 block_hash: B256,
667 res: Box<ProviderResult<B::Header>>,
668 },
669 CacheNewCanonicalChain {
670 chain_change: ChainChange<B, R>,
671 },
672 RemoveReorgedChain {
673 chain_change: ChainChange<B, R>,
674 },
675 GetCachedParentBlocks {
676 block_hash: B256,
677 max_blocks: usize,
678 response_tx: CachedParentBlocksResponseSender<B>,
679 },
680}
681
682struct BlockReceipts<R> {
683 block_hash: B256,
684 receipts: Vec<R>,
685}
686
687struct ChainChange<B: Block, R> {
689 blocks: Vec<RecoveredBlock<B>>,
690 receipts: Vec<BlockReceipts<R>>,
691}
692
693impl<B: Block, R: Clone> ChainChange<B, R> {
694 fn new<N>(chain: Arc<Chain<N>>) -> Self
695 where
696 N: NodePrimitives<Block = B, Receipt = R>,
697 {
698 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
699 .blocks_and_receipts()
700 .map(|(block, receipts)| {
701 let block_receipts =
702 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
703 (block.clone(), block_receipts)
704 })
705 .unzip();
706 Self { blocks, receipts }
707 }
708}
709
710#[derive(Copy, Clone, Debug)]
712enum CacheKind {
713 Block,
714 Receipt,
715 Header,
716}
717
718#[derive(Debug)]
723struct ActionSender<B: Block, R: Send + Sync> {
724 kind: CacheKind,
725 blockhash: B256,
726 tx: Option<UnboundedSender<CacheAction<B, R>>>,
727}
728
729impl<R: Send + Sync, B: Block> ActionSender<B, R> {
730 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
731 Self { kind, blockhash, tx: Some(tx) }
732 }
733
734 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
735 if let Some(tx) = self.tx.take() {
736 let _ = tx.send(CacheAction::BlockWithSendersResult {
737 block_hash: self.blockhash,
738 res: block_sender,
739 });
740 }
741 }
742
743 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
744 if let Some(tx) = self.tx.take() {
745 let _ =
746 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
747 }
748 }
749
750 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
751 if let Some(tx) = self.tx.take() {
752 let _ = tx.send(CacheAction::HeaderResult {
753 block_hash: self.blockhash,
754 res: Box::new(header),
755 });
756 }
757 }
758}
759impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
760 fn drop(&mut self) {
761 if let Some(tx) = self.tx.take() {
762 let msg = match self.kind {
763 CacheKind::Block => CacheAction::BlockWithSendersResult {
764 block_hash: self.blockhash,
765 res: Err(CacheServiceUnavailable.into()),
766 },
767 CacheKind::Receipt => CacheAction::ReceiptsResult {
768 block_hash: self.blockhash,
769 res: Err(CacheServiceUnavailable.into()),
770 },
771 CacheKind::Header => CacheAction::HeaderResult {
772 block_hash: self.blockhash,
773 res: Box::new(Err(CacheServiceUnavailable.into())),
774 },
775 };
776 let _ = tx.send(msg);
777 }
778 }
779}
780
781pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
786 eth_state_cache: EthStateCache<N>,
787 mut events: St,
788) where
789 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
790{
791 while let Some(event) = events.next().await {
792 if let Some(reverted) = event.reverted() {
793 let chain_change = ChainChange::new(reverted);
794
795 let _ =
796 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
797 }
798
799 let chain_change = ChainChange::new(event.committed());
800
801 let _ =
802 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
803 }
804}