use super::{EthStateCacheConfig, MultiConsumerLruCache};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use futures::{future::Either, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives::{NodePrimitives, SealedBlockWithSenders};
use reth_primitives_traits::{Block, BlockBody};
use reth_storage_api::{BlockReader, StateProviderFactory, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use schnellru::{ByLength, Limiter};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, Semaphore,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
pub mod config;
pub mod db;
pub mod metrics;
pub mod multi_consumer;
type BlockTransactionsResponseSender<T> = oneshot::Sender<ProviderResult<Option<Vec<T>>>>;
type BlockWithSendersResponseSender<B> =
oneshot::Sender<ProviderResult<Option<Arc<SealedBlockWithSenders<B>>>>>;
type ReceiptsResponseSender<R> = oneshot::Sender<ProviderResult<Option<Arc<Vec<R>>>>>;
type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
type BlockLruCache<B, L> = MultiConsumerLruCache<
B256,
Arc<SealedBlockWithSenders<B>>,
L,
Either<
BlockWithSendersResponseSender<B>,
BlockTransactionsResponseSender<<<B as Block>::Body as BlockBody>::Transaction>,
>,
>;
type ReceiptsLruCache<R, L> =
MultiConsumerLruCache<B256, Arc<Vec<R>>, L, ReceiptsResponseSender<R>>;
type HeaderLruCache<H, L> = MultiConsumerLruCache<B256, H, L, HeaderResponseSender<H>>;
#[derive(Debug)]
pub struct EthStateCache<B: Block, R> {
to_service: UnboundedSender<CacheAction<B, R>>,
}
impl<B: Block, R> Clone for EthStateCache<B, R> {
fn clone(&self) -> Self {
Self { to_service: self.to_service.clone() }
}
}
impl<B: Block, R: Send + Sync> EthStateCache<B, R> {
fn create<Provider, Tasks>(
provider: Provider,
action_task_spawner: Tasks,
max_blocks: u32,
max_receipts: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
) -> (Self, EthStateCacheService<Provider, Tasks>)
where
Provider: BlockReader<Block = B, Receipt = R>,
{
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
headers_cache: HeaderLruCache::new(max_headers, "headers"),
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
};
let cache = Self { to_service };
(cache, service)
}
pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
where
Provider:
StateProviderFactory + BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
{
Self::spawn_with(provider, config, TokioTaskExecutor::default())
}
pub fn spawn_with<Provider, Tasks>(
provider: Provider,
config: EthStateCacheConfig,
executor: Tasks,
) -> Self
where
Provider:
StateProviderFactory + BlockReader<Block = B, Receipt = R> + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let EthStateCacheConfig {
max_blocks,
max_receipts,
max_headers,
max_concurrent_db_requests,
} = config;
let (this, service) = Self::create(
provider,
executor.clone(),
max_blocks,
max_receipts,
max_headers,
max_concurrent_db_requests,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
}
pub async fn get_sealed_block_with_senders(
&self,
block_hash: B256,
) -> ProviderResult<Option<Arc<SealedBlockWithSenders<B>>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
pub async fn get_receipts(&self, block_hash: B256) -> ProviderResult<Option<Arc<Vec<R>>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
pub async fn get_block_and_receipts(
&self,
block_hash: B256,
) -> ProviderResult<Option<(Arc<SealedBlockWithSenders<B>>, Arc<Vec<R>>)>> {
let block = self.get_sealed_block_with_senders(block_hash);
let receipts = self.get_receipts(block_hash);
let (block, receipts) = futures::try_join!(block, receipts)?;
Ok(block.zip(receipts))
}
pub async fn get_header(&self, block_hash: B256) -> ProviderResult<B::Header> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
}
#[must_use = "Type does nothing unless spawned"]
pub(crate) struct EthStateCacheService<
Provider,
Tasks,
LimitBlocks = ByLength,
LimitReceipts = ByLength,
LimitHeaders = ByLength,
> where
Provider: BlockReader,
LimitBlocks: Limiter<B256, Arc<SealedBlockWithSenders<Provider::Block>>>,
LimitReceipts: Limiter<B256, Arc<Vec<Provider::Receipt>>>,
LimitHeaders: Limiter<B256, Provider::Header>,
{
provider: Provider,
full_block_cache: BlockLruCache<Provider::Block, LimitBlocks>,
receipts_cache: ReceiptsLruCache<Provider::Receipt, LimitReceipts>,
headers_cache: HeaderLruCache<Provider::Header, LimitHeaders>,
action_tx: UnboundedSender<CacheAction<Provider::Block, Provider::Receipt>>,
action_rx: UnboundedReceiverStream<CacheAction<Provider::Block, Provider::Receipt>>,
action_task_spawner: Tasks,
rate_limiter: Arc<Semaphore>,
}
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
where
Provider: StateProviderFactory + BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
fn on_new_block(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<SealedBlockWithSenders<Provider::Block>>>>,
) {
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
for tx in queued {
match tx {
Either::Left(block_with_senders) => {
let _ = block_with_senders.send(res.clone());
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
maybe_block.map(|block| block.block.body.transactions().to_vec())
}));
}
}
}
}
if let Ok(Some(block)) = res {
self.full_block_cache.insert(block_hash, block);
}
}
fn on_new_receipts(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
) {
if let Some(queued) = self.receipts_cache.remove(&block_hash) {
for tx in queued {
let _ = tx.send(res.clone());
}
}
if let Ok(Some(receipts)) = res {
self.receipts_cache.insert(block_hash, receipts);
}
}
fn on_reorg_block(
&mut self,
block_hash: B256,
res: ProviderResult<Option<SealedBlockWithSenders<Provider::Block>>>,
) {
let res = res.map(|b| b.map(Arc::new));
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
for tx in queued {
match tx {
Either::Left(block_with_senders) => {
let _ = block_with_senders.send(res.clone());
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
maybe_block.map(|block| block.block.body.transactions().to_vec())
}));
}
}
}
}
}
fn on_reorg_receipts(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Provider::Receipt>>>>,
) {
if let Some(queued) = self.receipts_cache.remove(&block_hash) {
for tx in queued {
let _ = tx.send(res.clone());
}
}
}
fn update_cached_metrics(&self) {
self.full_block_cache.update_cached_metrics();
self.receipts_cache.update_cached_metrics();
self.headers_cache.update_cached_metrics();
}
}
impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
where
Provider: StateProviderFactory + BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.action_rx.poll_next_unpin(cx)) {
None => {
unreachable!("can't close")
}
Some(action) => {
match action {
CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(block)));
continue
}
if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
let _permit = rate_limiter.acquire().await;
let block_sender = provider
.sealed_block_with_senders(
BlockHashOrNumber::Hash(block_hash),
TransactionVariant::WithHash,
)
.map(|maybe_block| maybe_block.map(Arc::new));
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
block_hash,
res: block_sender,
});
}));
}
}
CacheAction::GetReceipts { block_hash, response_tx } => {
if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(receipts)));
continue
}
if this.receipts_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
let _permit = rate_limiter.acquire().await;
let res = provider
.receipts_by_block(block_hash.into())
.map(|maybe_receipts| maybe_receipts.map(Arc::new));
let _ = action_tx
.send(CacheAction::ReceiptsResult { block_hash, res });
}));
}
}
CacheAction::GetHeader { block_hash, response_tx } => {
if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(header));
continue
}
if this.headers_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
let _permit = rate_limiter.acquire().await;
let header = provider.header(&block_hash).and_then(|header| {
header.ok_or_else(|| {
ProviderError::HeaderNotFound(block_hash.into())
})
});
let _ = action_tx.send(CacheAction::HeaderResult {
block_hash,
res: Box::new(header),
});
}));
}
}
CacheAction::ReceiptsResult { block_hash, res } => {
this.on_new_receipts(block_hash, res);
}
CacheAction::BlockWithSendersResult { block_hash, res } => match res {
Ok(Some(block_with_senders)) => {
this.on_new_block(block_hash, Ok(Some(block_with_senders)));
}
Ok(None) => {
this.on_new_block(block_hash, Ok(None));
}
Err(e) => {
this.on_new_block(block_hash, Err(e));
}
},
CacheAction::HeaderResult { block_hash, res } => {
let res = *res;
if let Some(queued) = this.headers_cache.remove(&block_hash) {
for tx in queued {
let _ = tx.send(res.clone());
}
}
if let Ok(data) = res {
this.headers_cache.insert(block_hash, data);
}
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}
for block_receipts in chain_change.receipts {
this.on_new_receipts(
block_receipts.block_hash,
Ok(Some(Arc::new(
block_receipts.receipts.into_iter().flatten().collect(),
))),
);
}
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
this.on_reorg_block(block.hash(), Ok(Some(block)));
}
for block_receipts in chain_change.receipts {
this.on_reorg_receipts(
block_receipts.block_hash,
Ok(Some(Arc::new(
block_receipts.receipts.into_iter().flatten().collect(),
))),
);
}
}
};
this.update_cached_metrics();
}
}
}
}
}
enum CacheAction<B: Block, R> {
GetBlockWithSenders {
block_hash: B256,
response_tx: BlockWithSendersResponseSender<B>,
},
GetHeader {
block_hash: B256,
response_tx: HeaderResponseSender<B::Header>,
},
GetReceipts {
block_hash: B256,
response_tx: ReceiptsResponseSender<R>,
},
BlockWithSendersResult {
block_hash: B256,
res: ProviderResult<Option<Arc<SealedBlockWithSenders<B>>>>,
},
ReceiptsResult {
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<R>>>>,
},
HeaderResult {
block_hash: B256,
res: Box<ProviderResult<B::Header>>,
},
CacheNewCanonicalChain {
chain_change: ChainChange<B, R>,
},
RemoveReorgedChain {
chain_change: ChainChange<B, R>,
},
}
struct BlockReceipts<R> {
block_hash: B256,
receipts: Vec<Option<R>>,
}
struct ChainChange<B: Block, R> {
blocks: Vec<SealedBlockWithSenders<B>>,
receipts: Vec<BlockReceipts<R>>,
}
impl<B: Block, R: Clone> ChainChange<B, R> {
fn new<N>(chain: Arc<Chain<N>>) -> Self
where
N: NodePrimitives<Block = B, Receipt = R>,
{
let (blocks, receipts): (Vec<_>, Vec<_>) = chain
.blocks_and_receipts()
.map(|(block, receipts)| {
let block_receipts =
BlockReceipts { block_hash: block.block.hash(), receipts: receipts.clone() };
(block.clone(), block_receipts)
})
.unzip();
Self { blocks, receipts }
}
}
pub async fn cache_new_blocks_task<St, N: NodePrimitives>(
eth_state_cache: EthStateCache<N::Block, N::Receipt>,
mut events: St,
) where
St: Stream<Item = CanonStateNotification<N>> + Unpin + 'static,
{
while let Some(event) = events.next().await {
if let Some(reverted) = event.reverted() {
let chain_change = ChainChange::new(reverted);
let _ =
eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
}
let chain_change = ChainChange::new(event.committed());
let _ =
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
}