1use super::{EthStateCacheConfig, MultiConsumerLruCache};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::B256;
6use futures::{future::Either, Stream, StreamExt};
7use reth_chain_state::CanonStateNotification;
8use reth_errors::{ProviderError, ProviderResult};
9use reth_execution_types::Chain;
10use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
11use reth_storage_api::{BlockReader, TransactionVariant};
12use reth_tasks::{TaskSpawner, TokioTaskExecutor};
13use schnellru::{ByLength, Limiter};
14use std::{
15 future::Future,
16 pin::Pin,
17 sync::Arc,
18 task::{Context, Poll},
19};
20use tokio::sync::{
21 mpsc::{unbounded_channel, UnboundedSender},
22 oneshot, Semaphore,
23};
24use tokio_stream::wrappers::UnboundedReceiverStream;
25
26pub mod config;
27pub mod db;
28pub mod metrics;
29pub mod multi_consumer;
30
31type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
33
34type BlockWithSendersResponseSender<B> =
36 oneshot::Sender<ProviderResult<Option<Arc<RecoveredBlock<B>>>>>;
37
38type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
40
41type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
43
44type BlockLruCache<B, L> = MultiConsumerLruCache<
45 B256,
46 Arc<RecoveredBlock<B>>,
47 L,
48 Either<
49 BlockWithSendersResponseSender<B>,
50 BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
51 >,
52>;
53
54type ReceiptsLruCache<R, L> =
55 MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
56
57type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
58
59#[derive(Debug)]
64pub struct EthStateCache<B: Block, R> {
65 to_service: UnboundedSender<CacheAction<B, R>>,
66}
67
68impl<B: Block, R> Clone for EthStateCache<B, R> {
69 fn clone(&self) -> Self {
70 Self { to_service: self.to_service.clone() }
71 }
72}
73
74impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
75 fn create<Provider, Tasks>(
77 provider: Provider,
78 action_task_spawner: Tasks,
79 max_blocks: u32,
80 max_receipts: u32,
81 max_headers: u32,
82 max_concurrent_db_operations: usize,
83 ) -> (Self, EthStateCacheService<Provider, Tasks>)
84 where
85 Provider: BlockReader<Block = B, Receipt = R>,
86 {
87 let (to_service, rx) = unbounded_channel();
88 let service = EthStateCacheService {
89 provider,
90 full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
91 receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
92 headers_cache: HeaderLruCache::new(max_headers, "headers"),
93 action_tx: to_service.clone(),
94 action_rx: UnboundedReceiverStream::new(rx),
95 action_task_spawner,
96 rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
97 };
98 let cache = Self { to_service };
99 (cache, service)
100 }
101
102 pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
107 where
108 Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
109 {
110 Self::spawn_with(provider, config, TokioTaskExecutor::default())
111 }
112
113 pub fn spawn_with<Provider, Tasks>(
118 provider: Provider,
119 config: EthStateCacheConfig,
120 executor: Tasks,
121 ) -> Self
122 where
123 Provider: BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
124 Tasks: TaskSpawner + Clone + 'static,
125 {
126 let EthStateCacheConfig {
127 max_blocks,
128 max_receipts,
129 max_headers,
130 max_concurrent_db_requests,
131 } = config;
132 let (this, service) = Self::create(
133 provider,
134 executor.clone(),
135 max_blocks,
136 max_receipts,
137 max_headers,
138 max_concurrent_db_requests,
139 );
140 executor.spawn_critical("eth state cache", Box::pin(service));
141 this
142 }
143
144 pub async fn get_recovered_block(
148 &self,
149 block_hash: B256,
150 ) -> ProviderResult<Option<Arc<RecoveredBlock<B>>>> {
151 let (response_tx, rx) = oneshot::channel();
152 let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
153 rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
154 }
155
156 pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult<Option<Arc<Vec<R>>>> {
160 let (response_tx, rx) = oneshot::channel();
161 let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
162 rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
163 }
164
165 pub async fn get_block_and_receipts(
167 &self,
168 block_hash: B256,
169 ) -> ProviderResult<Option<(Arc<RecoveredBlock<B>>, Arc<Vec<R>>)>> {
170 let block = self.get_recovered_block(block_hash);
171 let receipts = self.get_receipts(block_hash);
172
173 let (block, receipts) = futures::try_join!(block, receipts)?;
174
175 Ok(block.zip(receipts))
176 }
177
178 pub async fn get_header(&self, block_hash: B256) -> ProviderResult<B::Header> {
182 let (response_tx, rx) = oneshot::channel();
183 let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
184 rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
185 }
186}
187
188#[must_use = "Type does nothing unless spawned"]
205pub(crate) struct EthStateCacheService<
206 Provider,
207 Tasks,
208 LimitBlocks = ByLength,
209 LimitReceipts = ByLength,
210 LimitHeaders = ByLength,
211> where
212 Provider: BlockReader,
213 LimitBlocks: Limiter<B256, Arc<RecoveredBlock<Provider::Block>>>,
214 LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
215 LimitHeaders: Limiter<B256, Provider::Header>,
216{
217 provider: Provider,
219 full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
221 receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
223 headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
228 action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
230 action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
232 action_task_spawner: Tasks,
234 rate_limiter: Arc<Semaphore>,
238}
239
240impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
241where
242 Provider: BlockReader + Clone + Unpin + 'static,
243 Tasks: TaskSpawner + Clone + 'static,
244{
245 fn on_new_block(
246 &mut self,
247 block_hash: B256,
248 res: ProviderResult<Option<Arc<RecoveredBlock<Provider::Block>>>>,
249 ) {
250 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
251 for tx in queued {
253 match tx {
254 Either::Left(block_with_senders) => {
255 let _ = block_with_senders.send(res.clone());
256 }
257 Either::Right(transaction_tx) => {
258 let _ = transaction_tx.send(res.clone().map(|maybe_block| {
259 maybe_block.map(|block| block.body().transactions().to_vec())
260 }));
261 }
262 }
263 }
264 }
265
266 if let Ok(Some(block)) = res {
268 self.full_block_cache.insert(block_hash, block);
269 }
270 }
271
272 fn on_new_receipts(
273 &mut self,
274 block_hash: B256,
275 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
276 ) {
277 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
278 for tx in queued {
280 let _ = tx.send(res.clone());
281 }
282 }
283
284 if let Ok(Some(receipts)) = res {
286 self.receipts_cache.insert(block_hash, receipts);
287 }
288 }
289
290 fn on_reorg_block(
291 &mut self,
292 block_hash: B256,
293 res: ProviderResult<Option<RecoveredBlock<Provider::Block>>>,
294 ) {
295 let res = res.map(|b| b.map(Arc::new));
296 if let Some(queued) = self.full_block_cache.remove(&block_hash) {
297 for tx in queued {
299 match tx {
300 Either::Left(block_with_senders) => {
301 let _ = block_with_senders.send(res.clone());
302 }
303 Either::Right(transaction_tx) => {
304 let _ = transaction_tx.send(res.clone().map(|maybe_block| {
305 maybe_block.map(|block| block.body().transactions().to_vec())
306 }));
307 }
308 }
309 }
310 }
311 }
312
313 fn on_reorg_receipts(
314 &mut self,
315 block_hash: B256,
316 res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
317 ) {
318 if let Some(queued) = self.receipts_cache.remove(&block_hash) {
319 for tx in queued {
321 let _ = tx.send(res.clone());
322 }
323 }
324 }
325
326 fn shrink_queues(&mut self) {
328 let min_capacity = 2;
329 self.full_block_cache.shrink_to(min_capacity);
330 self.receipts_cache.shrink_to(min_capacity);
331 self.headers_cache.shrink_to(min_capacity);
332 }
333
334 fn update_cached_metrics(&self) {
335 self.full_block_cache.update_cached_metrics();
336 self.receipts_cache.update_cached_metrics();
337 self.headers_cache.update_cached_metrics();
338 }
339}
340
341impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
342where
343 Provider: BlockReader + Clone + Unpin + 'static,
344 Tasks: TaskSpawner + Clone + 'static,
345{
346 type Output = ();
347
348 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
349 let this = self.get_mut();
350
351 loop {
352 let Poll::Ready(action) = this.action_rx.poll_next_unpin(cx) else {
353 this.shrink_queues();
355 return Poll::Pending;
356 };
357
358 match action {
359 None => {
360 unreachable!("can't close")
361 }
362 Some(action) => {
363 match action {
364 CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
365 if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
366 let _ = response_tx.send(Ok(Some(block)));
367 continue
368 }
369
370 if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
372 let provider = this.provider.clone();
373 let action_tx = this.action_tx.clone();
374 let rate_limiter = this.rate_limiter.clone();
375 let mut action_sender =
376 ActionSender::new(CacheKind::Block, block_hash, action_tx);
377 this.action_task_spawner.spawn_blocking(Box::pin(async move {
378 let _permit = rate_limiter.acquire().await;
380 let block_sender = provider
383 .sealed_block_with_senders(
384 BlockHashOrNumber::Hash(block_hash),
385 TransactionVariant::WithHash,
386 )
387 .map(|maybe_block| maybe_block.map(Arc::new));
388 action_sender.send_block(block_sender);
389 }));
390 }
391 }
392 CacheAction::GetReceipts { block_hash, response_tx } => {
393 if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
395 let _ = response_tx.send(Ok(Some(receipts)));
396 continue
397 }
398
399 if this.receipts_cache.queue(block_hash, response_tx) {
401 let provider = this.provider.clone();
402 let action_tx = this.action_tx.clone();
403 let rate_limiter = this.rate_limiter.clone();
404 let mut action_sender =
405 ActionSender::new(CacheKind::Receipt, block_hash, action_tx);
406 this.action_task_spawner.spawn_blocking(Box::pin(async move {
407 let _permit = rate_limiter.acquire().await;
409 let res = provider
410 .receipts_by_block(block_hash.into())
411 .map(|maybe_receipts| maybe_receipts.map(Arc::new));
412
413 action_sender.send_receipts(res);
414 }));
415 }
416 }
417 CacheAction::GetHeader { block_hash, response_tx } => {
418 if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
420 let _ = response_tx.send(Ok(header));
421 continue
422 }
423
424 if let Some(block) = this.full_block_cache.get(&block_hash) {
426 let _ = response_tx.send(Ok(block.clone_header()));
427 continue
428 }
429
430 if this.headers_cache.queue(block_hash, response_tx) {
433 let provider = this.provider.clone();
434 let action_tx = this.action_tx.clone();
435 let rate_limiter = this.rate_limiter.clone();
436 let mut action_sender =
437 ActionSender::new(CacheKind::Header, block_hash, action_tx);
438 this.action_task_spawner.spawn_blocking(Box::pin(async move {
439 let _permit = rate_limiter.acquire().await;
441 let header = provider.header(&block_hash).and_then(|header| {
442 header.ok_or_else(|| {
443 ProviderError::HeaderNotFound(block_hash.into())
444 })
445 });
446 action_sender.send_header(header);
447 }));
448 }
449 }
450 CacheAction::ReceiptsResult { block_hash, res } => {
451 this.on_new_receipts(block_hash, res);
452 }
453 CacheAction::BlockWithSendersResult { block_hash, res } => match res {
454 Ok(Some(block_with_senders)) => {
455 this.on_new_block(block_hash, Ok(Some(block_with_senders)));
456 }
457 Ok(None) => {
458 this.on_new_block(block_hash, Ok(None));
459 }
460 Err(e) => {
461 this.on_new_block(block_hash, Err(e));
462 }
463 },
464 CacheAction::HeaderResult { block_hash, res } => {
465 let res = *res;
466 if let Some(queued) = this.headers_cache.remove(&block_hash) {
467 for tx in queued {
469 let _ = tx.send(res.clone());
470 }
471 }
472
473 if let Ok(data) = res {
475 this.headers_cache.insert(block_hash, data);
476 }
477 }
478 CacheAction::CacheNewCanonicalChain { chain_change } => {
479 for block in chain_change.blocks {
480 this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
481 }
482
483 for block_receipts in chain_change.receipts {
484 this.on_new_receipts(
485 block_receipts.block_hash,
486 Ok(Some(Arc::new(block_receipts.receipts))),
487 );
488 }
489 }
490 CacheAction::RemoveReorgedChain { chain_change } => {
491 for block in chain_change.blocks {
492 this.on_reorg_block(block.hash(), Ok(Some(block)));
493 }
494
495 for block_receipts in chain_change.receipts {
496 this.on_reorg_receipts(
497 block_receipts.block_hash,
498 Ok(Some(Arc::new(block_receipts.receipts))),
499 );
500 }
501 }
502 };
503 this.update_cached_metrics();
504 }
505 }
506 }
507 }
508}
509
510enum CacheAction<B: Block, R> {
512 GetBlockWithSenders { block_hash: B256, response_tx: BlockWithSendersResponseSender<B> },
513 GetHeader { block_hash: B256, response_tx: HeaderResponseSender<B::Header> },
514 GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender<R> },
515 BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<Arc<RecoveredBlock<B>>>> },
516 ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Arc<Vec<R>>>> },
517 HeaderResult { block_hash: B256, res: Box<ProviderResult<B::Header>> },
518 CacheNewCanonicalChain { chain_change: ChainChange<B, R> },
519 RemoveReorgedChain { chain_change: ChainChange<B, R> },
520}
521
522struct BlockReceipts<R> {
523 block_hash: B256,
524 receipts: Vec<R>,
525}
526
527struct ChainChange<B: Block, R> {
529 blocks: Vec<RecoveredBlock<B>>,
530 receipts: Vec<BlockReceipts<R>>,
531}
532
533impl<B: Block, R: Clone> ChainChange<B, R> {
534 fn new<N>(chain: Arc<Chain<N>>) -> Self
535 where
536 N: NodePrimitives<Block = B, Receipt = R>,
537 {
538 let (blocks, receipts): (Vec<_>, Vec<_>) = chain
539 .blocks_and_receipts()
540 .map(|(block, receipts)| {
541 let block_receipts =
542 BlockReceipts { block_hash: block.hash(), receipts: receipts.clone() };
543 (block.clone(), block_receipts)
544 })
545 .unzip();
546 Self { blocks, receipts }
547 }
548}
549
550#[derive(Copy, Clone, Debug)]
552enum CacheKind {
553 Block,
554 Receipt,
555 Header,
556}
557
558#[derive(Debug)]
563struct ActionSender<B: Block, R: Send + Sync> {
564 kind: CacheKind,
565 blockhash: B256,
566 tx: Option<UnboundedSender<CacheAction<B, R>>>,
567}
568
569impl<R: Send + Sync, B: Block> ActionSender<B, R> {
570 const fn new(kind: CacheKind, blockhash: B256, tx: UnboundedSender<CacheAction<B, R>>) -> Self {
571 Self { kind, blockhash, tx: Some(tx) }
572 }
573
574 fn send_block(&mut self, block_sender: Result<Option<Arc<RecoveredBlock<B>>>, ProviderError>) {
575 if let Some(tx) = self.tx.take() {
576 let _ = tx.send(CacheAction::BlockWithSendersResult {
577 block_hash: self.blockhash,
578 res: block_sender,
579 });
580 }
581 }
582
583 fn send_receipts(&mut self, receipts: Result<Option<Arc<Vec<R>>>, ProviderError>) {
584 if let Some(tx) = self.tx.take() {
585 let _ =
586 tx.send(CacheAction::ReceiptsResult { block_hash: self.blockhash, res: receipts });
587 }
588 }
589
590 fn send_header(&mut self, header: Result<<B as Block>::Header, ProviderError>) {
591 if let Some(tx) = self.tx.take() {
592 let _ = tx.send(CacheAction::HeaderResult {
593 block_hash: self.blockhash,
594 res: Box::new(header),
595 });
596 }
597 }
598}
599impl<R: Send + Sync, B: Block> Drop for ActionSender<B, R> {
600 fn drop(&mut self) {
601 if let Some(tx) = self.tx.take() {
602 let msg = match self.kind {
603 CacheKind::Block => CacheAction::BlockWithSendersResult {
604 block_hash: self.blockhash,
605 res: Err(ProviderError::CacheServiceUnavailable),
606 },
607 CacheKind::Receipt => CacheAction::ReceiptsResult {
608 block_hash: self.blockhash,
609 res: Err(ProviderError::CacheServiceUnavailable),
610 },
611 CacheKind::Header => CacheAction::HeaderResult {
612 block_hash: self.blockhash,
613 res: Box::new(Err(ProviderError::CacheServiceUnavailable)),
614 },
615 };
616 let _ = tx.send(msg);
617 }
618 }
619}
620
621pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
626 eth_state_cache: EthStateCache<N::Block, N::Receipt>,
627 mut events: St,
628) where
629 St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
630{
631 while let Some(event) = events.next().await {
632 if let Some(reverted) = event.reverted() {
633 let chain_change = ChainChange::new(reverted);
634
635 let _ =
636 eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
637 }
638
639 let chain_change = ChainChange::new(event.committed());
640
641 let _ =
642 eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
643 }
644}