1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use alloy_consensus::BlockHeader;
5use alloy_eips::BlockHashOrNumber;
6use alloy_primitives::B256;
7use futures::{future::Either, Stream, StreamExt};
8use reth_chain_state::CanonStateNotification;
9use reth_errors::{ProviderError, ProviderResult};
10use reth_execution_types::Chain;
11use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
12use reth_storage_api::{BlockReader, TransactionVariant};
13use reth_tasks::{TaskSpawner, TokioTaskExecutor};
14use schnellru::{ByLength, Limiter};
15use std::{
16 future::Future,
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20};
21use tokio::sync::{
22 mpsc::{unbounded_channel, UnboundedSender},
23 oneshot, Semaphore,
24};
25use tokio_stream::wrappers::UnboundedReceiverStream;
26
27pub mod config;
28pub mod db;
29pub mod metrics;
30pub mod multi_consumer;
31
32type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
34
35type BlockWithSendersResponseSender<B> =
37 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
38
39type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
41
42type CachedBlockResponseSender<B> = oneshot::Sender<Option<Arc<RecoveredBlock<B>>>>;
43
44type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
46
47type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;
49
50type BlockLruCache<B, L> = MultiConsumerLruCache<
51 B256,
52 Arc<RecoveredBlock<B>>,
53 L,
54 Either<
55 BlockWithSendersResponseSender<B>,
56 BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
57 >,
58>;
59
60type ReceiptsLruCache<R, L> =
61 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
62
63type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
64
65#[derive(Debug)]
70pub struct EthStateCache<B: Block, R> {
71 to_service: UnboundedSender<CacheAction<B, R>>,
72}
73
74impl<B: Block, R> Clone for EthStateCache<B, R> {
75 fn clone(&self) -> Self {
76 Self { to_service: self.to_service.clone() }
77 }
78}
79
80impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
81 fn create<Provider, Tasks>(
83 provider: Provider,
84 action_task_spawner: Tasks,
85 max_blocks: u32,
86 max_receipts: u32,
87 max_headers: u32,
88 max_concurrent_db_operations: usize,
89 ) -> (Self, EthStateCacheService<Provider, Tasks>)
90 where
91 Provider: BlockReader<Block = B, Receipt = R>,
92 {
93 let (to_service, rx) = unbounded_channel();
94 let service = EthStateCacheService {
95 provider,
96 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
97 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
98 headers_cache: HeaderLruCache::new(max_headers, "headers"),
99 action_tx: to_service.clone(),
100 action_rx: UnboundedReceiverStream::new(rx),
101 action_task_spawner,
102 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
103 };
104 let cache = Self { to_service };
105 (cache, service)
106 }
107
108 pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
113 where
114 Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
115 {
116 Self::spawn_with(provider, config, TokioTaskExecutor::default())
117 }
118
119 pub fn spawn_with<Provider, Tasks>(
124 provider: Provider,
125 config: EthStateCacheConfig,
126 executor: Tasks,
127 ) -> Self
128 where
129 Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
130 Tasks: TaskSpawner + Clone + 'static,
131 {
132 let EthStateCacheConfig {
133 max_blocks,
134 max_receipts,
135 max_headers,
136 max_concurrent_db_requests,
137 } = config;
138 let (this, service) = Self::create(
139 provider,
140 executor.clone(),
141 max_blocks,
142 max_receipts,
143 max_headers,
144 max_concurrent_db_requests,
145 );
146 executor.spawn_critical("eth state cache", Box::pin(service));
147 this
148 }
149
150 pub async fn get_recovered_block(
154 &self,
155 block_hash: B256,
156 ) -> ProviderResult<Option<Arc<RecoveredBlock<B>>>> {
157 let (response_tx, rx) = oneshot::channel();
158 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
159 rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
160 }
161
162 pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult<Option<Arc<Vec<R>>>> {
166 let (response_tx, rx) = oneshot::channel();
167 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
168 rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
169 }
170
171 pub async fn get_block_and_receipts(
173 &self,
174 block_hash: B256,
175 ) -> ProviderResult<Option<(Arc<RecoveredBlock<B>>, Arc<Vec<R>>)>> {
176 let block = self.get_recovered_block(block_hash);
177 let receipts = self.get_receipts(block_hash);
178
179 let (block, receipts) = futures::try_join!(block, receipts)?;
180
181 Ok(block.zip(receipts))
182 }
183
184 pub async fn get_receipts_and_maybe_block(
186 &self,
187 block_hash: B256,
188 ) -> ProviderResult<Option<(Arc<Vec<R>>, Option<Arc<RecoveredBlock<B>>>)>> {
189 let (response_tx, rx) = oneshot::channel();
190 let _ = self.to_service.send(CacheAction::GetCachedBlock { block_hash, response_tx });
191
192 let receipts = self.get_receipts(block_hash);
193
194 let (receipts, block) = futures::join!(receipts, rx);
195
196 let block = block.map_err(|_| ProviderError::CacheServiceUnavailable)?;
197 Ok(receipts?.map(|r| (r, block)))
198 }
199
200 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<B::Header> {
204 let (response_tx, rx) = oneshot::channel();
205 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
206 rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
207 }
208
209 pub async fn get_cached_parent_blocks(
217 &self,
218 block_hash: B256,
219 max_blocks: usize,
220 ) -> Option<Vec<Arc<RecoveredBlock<B>>>> {
221 let (response_tx, rx) = oneshot::channel();
222 let _ = self.to_service.send(CacheAction::GetCachedParentBlocks {
223 block_hash,
224 max_blocks,
225 response_tx,
226 });
227
228 let blocks = rx.await.unwrap_or_default();
229 if blocks.is_empty() {
230 None
231 } else {
232 Some(blocks)
233 }
234 }
235}
236
237#[must_use = "Type does nothing unless spawned"]
254pub(crate) struct EthStateCacheService<
255 Provider,
256 Tasks,
257 LimitBlocks = ByLength,
258 LimitReceipts = ByLength,
259 LimitHeaders = ByLength,
260> where
261 Provider: BlockReader,
262 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
263 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
264 LimitHeaders: Limiter<B256, Provider::Header>,
265{
266 provider: Provider,
268 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
270 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
272 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
277 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
279 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
281 action_task_spawner: Tasks,
283 rate_limiter: Arc<Semaphore>,
287}
288
289impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
290where
291 Provider: BlockReader + Clone + Unpin + 'static,
292 Tasks: TaskSpawner + Clone + 'static,
293{
294 fn on_new_block(
295 &mut self,
296 block_hash: B256,
297 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
298 ) {
299 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
300 for tx in queued {
302 match tx {
303 Either::Left(block_with_senders) => {
304 let _ = block_with_senders.send(res.clone());
305 }
306 Either::Right(transaction_tx) => {
307 let _ = transaction_tx.send(res.clone().map(|maybe_block| {
308 maybe_block.map(|block| block.body().transactions().to_vec())
309 }));
310 }
311 }
312 }
313 }
314
315 if let Ok(Some(block)) = res {
317 self.full_block_cache.insert(block_hash, block);
318 }
319 }
320
321 fn on_new_receipts(
322 &mut self,
323 block_hash: B256,
324 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
325 ) {
326 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
327 for tx in queued {
329 let _ = tx.send(res.clone());
330 }
331 }
332
333 if let Ok(Some(receipts)) = res {
335 self.receipts_cache.insert(block_hash, receipts);
336 }
337 }
338
339 fn on_reorg_block(
340 &mut self,
341 block_hash: B256,
342 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
343 ) {
344 let res = res.map(|b| b.map(Arc::new));
345 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
346 for tx in queued {
348 match tx {
349 Either::Left(block_with_senders) => {
350 let _ = block_with_senders.send(res.clone());
351 }
352 Either::Right(transaction_tx) => {
353 let _ = transaction_tx.send(res.clone().map(|maybe_block| {
354 maybe_block.map(|block| block.body().transactions().to_vec())
355 }));
356 }
357 }
358 }
359 }
360 }
361
362 fn on_reorg_receipts(
363 &mut self,
364 block_hash: B256,
365 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
366 ) {
367 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
368 for tx in queued {
370 let _ = tx.send(res.clone());
371 }
372 }
373 }
374
375 fn shrink_queues(&mut self) {
377 let min_capacity = 2;
378 self.full_block_cache.shrink_to(min_capacity);
379 self.receipts_cache.shrink_to(min_capacity);
380 self.headers_cache.shrink_to(min_capacity);
381 }
382
383 fn update_cached_metrics(&self) {
384 self.full_block_cache.update_cached_metrics();
385 self.receipts_cache.update_cached_metrics();
386 self.headers_cache.update_cached_metrics();
387 }
388}
389
390impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
391where
392 Provider: BlockReader + Clone + Unpin + 'static,
393 Tasks: TaskSpawner + Clone + 'static,
394{
395 type Output = ();
396
397 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398 let this = self.get_mut();
399
400 loop {
401 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
402 this.shrink_queues();
404 return Poll::Pending;
405 };
406
407 match action {
408 None => {
409 unreachable!("can't close")
410 }
411 Some(action) => {
412 match action {
413 CacheAction::GetCachedBlock { block_hash, response_tx } => {
414 let _ =
415 response_tx.send(this.full_block_cache.get(&block_hash).cloned());
416 }
417 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
418 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
419 let _ = response_tx.send(Ok(Some(block)));
420 continue
421 }
422
423 if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
425 let provider = this.provider.clone();
426 let action_tx = this.action_tx.clone();
427 let rate_limiter = this.rate_limiter.clone();
428 let mut action_sender =
429 ActionSender::new(CacheKind::Block, block_hash, action_tx);
430 this.action_task_spawner.spawn_blocking(Box::pin(async move {
431 let _permit = rate_limiter.acquire().await;
433 let block_sender = provider
436 .sealed_block_with_senders(
437 BlockHashOrNumber::Hash(block_hash),
438 TransactionVariant::WithHash,
439 )
440 .map(|maybe_block| maybe_block.map(Arc::new));
441 action_sender.send_block(block_sender);
442 }));
443 }
444 }
445 CacheAction::GetReceipts { block_hash, response_tx } => {
446 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
448 let _ = response_tx.send(Ok(Some(receipts)));
449 continue
450 }
451
452 if this.receipts_cache.queue(block_hash, response_tx) {
454 let provider = this.provider.clone();
455 let action_tx = this.action_tx.clone();
456 let rate_limiter = this.rate_limiter.clone();
457 let mut action_sender =
458 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
459 this.action_task_spawner.spawn_blocking(Box::pin(async move {
460 let _permit = rate_limiter.acquire().await;
462 let res = provider
463 .receipts_by_block(block_hash.into())
464 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
465
466 action_sender.send_receipts(res);
467 }));
468 }
469 }
470 CacheAction::GetHeader { block_hash, response_tx } => {
471 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
473 let _ = response_tx.send(Ok(header));
474 continue
475 }
476
477 if let Some(block) = this.full_block_cache.get(&block_hash) {
479 let _ = response_tx.send(Ok(block.clone_header()));
480 continue
481 }
482
483 if this.headers_cache.queue(block_hash, response_tx) {
486 let provider = this.provider.clone();
487 let action_tx = this.action_tx.clone();
488 let rate_limiter = this.rate_limiter.clone();
489 let mut action_sender =
490 ActionSender::new(CacheKind::Header, block_hash, action_tx);
491 this.action_task_spawner.spawn_blocking(Box::pin(async move {
492 let _permit = rate_limiter.acquire().await;
494 let header = provider.header(&block_hash).and_then(|header| {
495 header.ok_or_else(|| {
496 ProviderError::HeaderNotFound(block_hash.into())
497 })
498 });
499 action_sender.send_header(header);
500 }));
501 }
502 }
503 CacheAction::ReceiptsResult { block_hash, res } => {
504 this.on_new_receipts(block_hash, res);
505 }
506 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
507 Ok(Some(block_with_senders)) => {
508 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
509 }
510 Ok(None) => {
511 this.on_new_block(block_hash, Ok(None));
512 }
513 Err(e) => {
514 this.on_new_block(block_hash, Err(e));
515 }
516 },
517 CacheAction::HeaderResult { block_hash, res } => {
518 let res = *res;
519 if let Some(queued) = this.headers_cache.remove(&block_hash) {
520 for tx in queued {
522 let _ = tx.send(res.clone());
523 }
524 }
525
526 if let Ok(data) = res {
528 this.headers_cache.insert(block_hash, data);
529 }
530 }
531 CacheAction::CacheNewCanonicalChain { chain_change } => {
532 for block in chain_change.blocks {
533 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
534 }
535
536 for block_receipts in chain_change.receipts {
537 this.on_new_receipts(
538 block_receipts.block_hash,
539 Ok(Some(Arc::new(block_receipts.receipts))),
540 );
541 }
542 }
543 CacheAction::RemoveReorgedChain { chain_change } => {
544 for block in chain_change.blocks {
545 this.on_reorg_block(block.hash(), Ok(Some(block)));
546 }
547
548 for block_receipts in chain_change.receipts {
549 this.on_reorg_receipts(
550 block_receipts.block_hash,
551 Ok(Some(Arc::new(block_receipts.receipts))),
552 );
553 }
554 }
555 CacheAction::GetCachedParentBlocks {
556 block_hash,
557 max_blocks,
558 response_tx,
559 } => {
560 let mut blocks = Vec::new();
561 let mut current_hash = block_hash;
562
563 while blocks.len() < max_blocks {
565 if let Some(block) =
566 this.full_block_cache.get(¤t_hash).cloned()
567 {
568 current_hash = block.header().parent_hash();
570 blocks.push(block);
571 } else {
572 break;
574 }
575 }
576
577 let _ = response_tx.send(blocks);
578 }
579 };
580 this.update_cached_metrics();
581 }
582 }
583 }
584 }
585}
586
587enum CacheAction<B: Block, R> {
589 GetBlockWithSenders {
590 block_hash: B256,
591 response_tx: BlockWithSendersResponseSender<B>,
592 },
593 GetHeader {
594 block_hash: B256,
595 response_tx: HeaderResponseSender<B::Header>,
596 },
597 GetReceipts {
598 block_hash: B256,
599 response_tx: ReceiptsResponseSender<R>,
600 },
601 GetCachedBlock {
602 block_hash: B256,
603 response_tx: CachedBlockResponseSender<B>,
604 },
605 BlockWithSendersResult {
606 block_hash: B256,
607 res: ProviderResult<Option<Arc<RecoveredBlock<B>>>>,
608 },
609 ReceiptsResult {
610 block_hash: B256,
611 res: ProviderResult<Option<Arc<Vec<R>>>>,
612 },
613 HeaderResult {
614 block_hash: B256,
615 res: Box<ProviderResult<B::Header>>,
616 },
617 CacheNewCanonicalChain {
618 chain_change: ChainChange<B, R>,
619 },
620 RemoveReorgedChain {
621 chain_change: ChainChange<B, R>,
622 },
623 GetCachedParentBlocks {
624 block_hash: B256,
625 max_blocks: usize,
626 response_tx: CachedParentBlocksResponseSender<B>,
627 },
628}
629
630struct BlockReceipts<R> {
631 block_hash: B256,
632 receipts: Vec<R>,
633}
634
635struct ChainChange<B: Block, R> {
637 blocks: Vec<RecoveredBlock<B>>,
638 receipts: Vec<BlockReceipts<R>>,
639}
640
641impl<B: Block, R: Clone> ChainChange<B, R> {
642 fn new<N>(chain: Arc<Chain<N>>) -> Self
643 where
644 N: NodePrimitives<Block = B, Receipt = R>,
645 {
646 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
647 .blocks_and_receipts()
648 .map(|(block, receipts)| {
649 let block_receipts =
650 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
651 (block.clone(), block_receipts)
652 })
653 .unzip();
654 Self { blocks, receipts }
655 }
656}
657
658#[derive(Copy, Clone, Debug)]
660enum CacheKind {
661 Block,
662 Receipt,
663 Header,
664}
665
666#[derive(Debug)]
671struct ActionSender<B: Block, R: Send + Sync> {
672 kind: CacheKind,
673 blockhash: B256,
674 tx: Option<UnboundedSender<CacheAction<B, R>>>,
675}
676
677impl<R: Send + Sync, B: Block> ActionSender<B, R> {
678 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
679 Self { kind, blockhash, tx: Some(tx) }
680 }
681
682 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
683 if let Some(tx) = self.tx.take() {
684 let _ = tx.send(CacheAction::BlockWithSendersResult {
685 block_hash: self.blockhash,
686 res: block_sender,
687 });
688 }
689 }
690
691 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
692 if let Some(tx) = self.tx.take() {
693 let _ =
694 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
695 }
696 }
697
698 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
699 if let Some(tx) = self.tx.take() {
700 let _ = tx.send(CacheAction::HeaderResult {
701 block_hash: self.blockhash,
702 res: Box::new(header),
703 });
704 }
705 }
706}
707impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
708 fn drop(&mut self) {
709 if let Some(tx) = self.tx.take() {
710 let msg = match self.kind {
711 CacheKind::Block => CacheAction::BlockWithSendersResult {
712 block_hash: self.blockhash,
713 res: Err(ProviderError::CacheServiceUnavailable),
714 },
715 CacheKind::Receipt => CacheAction::ReceiptsResult {
716 block_hash: self.blockhash,
717 res: Err(ProviderError::CacheServiceUnavailable),
718 },
719 CacheKind::Header => CacheAction::HeaderResult {
720 block_hash: self.blockhash,
721 res: Box::new(Err(ProviderError::CacheServiceUnavailable)),
722 },
723 };
724 let _ = tx.send(msg);
725 }
726 }
727}
728
729pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
734 eth_state_cache: EthStateCache<N::Block, N::Receipt>,
735 mut events: St,
736) where
737 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
738{
739 while let Some(event) = events.next().await {
740 if let Some(reverted) = event.reverted() {
741 let chain_change = ChainChange::new(reverted);
742
743 let _ =
744 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
745 }
746
747 let chain_change = ChainChange::new(event.committed());
748
749 let _ =
750 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
751 }
752}