use alloy_consensus::Header;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use futures::{future::Either, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_evm::{provider::EvmEnvProvider, ConfigureEvm};
use reth_execution_types::Chain;
use reth_primitives::{Receipt, SealedBlockWithSenders, TransactionSigned};
use reth_storage_api::{BlockReader, StateProviderFactory, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use revm::primitives::{BlockEnv, CfgEnv, CfgEnvWithHandlerCfg, SpecId};
use schnellru::{ByLength, Limiter};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, Semaphore,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use super::{EthStateCacheConfig, MultiConsumerLruCache};
pub mod config;
pub mod db;
pub mod metrics;
pub mod multi_consumer;
type BlockTransactionsResponseSender =
oneshot::Sender<ProviderResult<Option<Vec<TransactionSigned>>>>;
type BlockWithSendersResponseSender =
oneshot::Sender<ProviderResult<Option<Arc<SealedBlockWithSenders>>>>;
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Arc<Vec<Receipt>>>>>;
type EnvResponseSender = oneshot::Sender<ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>>;
type BlockLruCache<L> = MultiConsumerLruCache<
B256,
Arc<SealedBlockWithSenders>,
L,
Either<BlockWithSendersResponseSender, BlockTransactionsResponseSender>,
>;
type ReceiptsLruCache<L> =
MultiConsumerLruCache<B256, Arc<Vec<Receipt>>, L, ReceiptsResponseSender>;
type EnvLruCache<L> =
MultiConsumerLruCache<B256, (CfgEnvWithHandlerCfg, BlockEnv), L, EnvResponseSender>;
#[derive(Debug, Clone)]
pub struct EthStateCache {
to_service: UnboundedSender<CacheAction>,
}
impl EthStateCache {
fn create<Provider, Tasks, EvmConfig>(
provider: Provider,
action_task_spawner: Tasks,
evm_config: EvmConfig,
max_blocks: u32,
max_receipts: u32,
max_envs: u32,
max_concurrent_db_operations: usize,
) -> (Self, EthStateCacheService<Provider, Tasks, EvmConfig>) {
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
evm_env_cache: EnvLruCache::new(max_envs, "evm_env"),
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
evm_config,
};
let cache = Self { to_service };
(cache, service)
}
pub fn spawn<Provider, EvmConfig>(
provider: Provider,
config: EthStateCacheConfig,
evm_config: EvmConfig,
) -> Self
where
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
Self::spawn_with(provider, config, TokioTaskExecutor::default(), evm_config)
}
pub fn spawn_with<Provider, Tasks, EvmConfig>(
provider: Provider,
config: EthStateCacheConfig,
executor: Tasks,
evm_config: EvmConfig,
) -> Self
where
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
let EthStateCacheConfig { max_blocks, max_receipts, max_envs, max_concurrent_db_requests } =
config;
let (this, service) = Self::create(
provider,
executor.clone(),
evm_config,
max_blocks,
max_receipts,
max_envs,
max_concurrent_db_requests,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
}
pub async fn get_sealed_block_with_senders(
&self,
block_hash: B256,
) -> ProviderResult<Option<Arc<SealedBlockWithSenders>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
pub async fn get_receipts(
&self,
block_hash: B256,
) -> ProviderResult<Option<Arc<Vec<Receipt>>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetReceipts { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
pub async fn get_block_and_receipts(
&self,
block_hash: B256,
) -> ProviderResult<Option<(Arc<SealedBlockWithSenders>, Arc<Vec<Receipt>>)>> {
let block = self.get_sealed_block_with_senders(block_hash);
let receipts = self.get_receipts(block_hash);
let (block, receipts) = futures::try_join!(block, receipts)?;
Ok(block.zip(receipts))
}
pub async fn get_evm_env(
&self,
block_hash: B256,
) -> ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetEnv { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
}
#[must_use = "Type does nothing unless spawned"]
pub(crate) struct EthStateCacheService<
Provider,
Tasks,
EvmConfig,
LimitBlocks = ByLength,
LimitReceipts = ByLength,
LimitEnvs = ByLength,
> where
LimitBlocks: Limiter<B256, Arc<SealedBlockWithSenders>>,
LimitReceipts: Limiter<B256, Arc<Vec<Receipt>>>,
LimitEnvs: Limiter<B256, (CfgEnvWithHandlerCfg, BlockEnv)>,
{
provider: Provider,
full_block_cache: BlockLruCache<LimitBlocks>,
receipts_cache: ReceiptsLruCache<LimitReceipts>,
evm_env_cache: EnvLruCache<LimitEnvs>,
action_tx: UnboundedSender<CacheAction>,
action_rx: UnboundedReceiverStream<CacheAction>,
action_task_spawner: Tasks,
rate_limiter: Arc<Semaphore>,
evm_config: EvmConfig,
}
impl<Provider, Tasks, EvmConfig> EthStateCacheService<Provider, Tasks, EvmConfig>
where
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
fn on_new_block(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<SealedBlockWithSenders>>>,
) {
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
for tx in queued {
match tx {
Either::Left(block_with_senders) => {
let _ = block_with_senders.send(res.clone());
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
maybe_block.map(|block| block.block.body.transactions.clone())
}));
}
}
}
}
if let Ok(Some(block)) = res {
self.full_block_cache.insert(block_hash, block);
}
}
fn on_new_receipts(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Receipt>>>>,
) {
if let Some(queued) = self.receipts_cache.remove(&block_hash) {
for tx in queued {
let _ = tx.send(res.clone());
}
}
if let Ok(Some(receipts)) = res {
self.receipts_cache.insert(block_hash, receipts);
}
}
fn on_reorg_block(
&mut self,
block_hash: B256,
res: ProviderResult<Option<SealedBlockWithSenders>>,
) {
let res = res.map(|b| b.map(Arc::new));
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
for tx in queued {
match tx {
Either::Left(block_with_senders) => {
let _ = block_with_senders.send(res.clone());
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
maybe_block.map(|block| block.block.body.transactions.clone())
}));
}
}
}
}
}
fn on_reorg_receipts(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Receipt>>>>,
) {
if let Some(queued) = self.receipts_cache.remove(&block_hash) {
for tx in queued {
let _ = tx.send(res.clone());
}
}
}
fn update_cached_metrics(&self) {
self.full_block_cache.update_cached_metrics();
self.receipts_cache.update_cached_metrics();
self.evm_env_cache.update_cached_metrics();
}
}
impl<Provider, Tasks, EvmConfig> Future for EthStateCacheService<Provider, Tasks, EvmConfig>
where
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.action_rx.poll_next_unpin(cx)) {
None => {
unreachable!("can't close")
}
Some(action) => {
match action {
CacheAction::GetBlockWithSenders { block_hash, response_tx } => {
if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(block)));
continue
}
if this.full_block_cache.queue(block_hash, Either::Left(response_tx)) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
let _permit = rate_limiter.acquire().await;
let block_sender = provider
.sealed_block_with_senders(
BlockHashOrNumber::Hash(block_hash),
TransactionVariant::WithHash,
)
.map(|maybe_block| maybe_block.map(Arc::new));
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
block_hash,
res: block_sender,
});
}));
}
}
CacheAction::GetReceipts { block_hash, response_tx } => {
if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(receipts)));
continue
}
if this.receipts_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
let _permit = rate_limiter.acquire().await;
let res = provider
.receipts_by_block(block_hash.into())
.map(|maybe_receipts| maybe_receipts.map(Arc::new));
let _ = action_tx
.send(CacheAction::ReceiptsResult { block_hash, res });
}));
}
}
CacheAction::GetEnv { block_hash, response_tx } => {
if let Some(env) = this.evm_env_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(env));
continue
}
if this.evm_env_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let evm_config = this.evm_config.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
let _permit = rate_limiter.acquire().await;
let mut cfg = CfgEnvWithHandlerCfg::new_with_spec_id(
CfgEnv::default(),
SpecId::LATEST,
);
let mut block_env = BlockEnv::default();
let res = provider
.fill_env_at(
&mut cfg,
&mut block_env,
block_hash.into(),
evm_config,
)
.map(|_| (cfg, block_env));
let _ = action_tx.send(CacheAction::EnvResult {
block_hash,
res: Box::new(res),
});
}));
}
}
CacheAction::ReceiptsResult { block_hash, res } => {
this.on_new_receipts(block_hash, res);
}
CacheAction::BlockWithSendersResult { block_hash, res } => match res {
Ok(Some(block_with_senders)) => {
this.on_new_block(block_hash, Ok(Some(block_with_senders)));
}
Ok(None) => {
this.on_new_block(block_hash, Ok(None));
}
Err(e) => {
this.on_new_block(block_hash, Err(e));
}
},
CacheAction::EnvResult { block_hash, res } => {
let res = *res;
if let Some(queued) = this.evm_env_cache.remove(&block_hash) {
for tx in queued {
let _ = tx.send(res.clone());
}
}
if let Ok(data) = res {
this.evm_env_cache.insert(block_hash, data);
}
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}
for block_receipts in chain_change.receipts {
this.on_new_receipts(
block_receipts.block_hash,
Ok(Some(Arc::new(
block_receipts.receipts.into_iter().flatten().collect(),
))),
);
}
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
this.on_reorg_block(block.hash(), Ok(Some(block)));
}
for block_receipts in chain_change.receipts {
this.on_reorg_receipts(
block_receipts.block_hash,
Ok(Some(Arc::new(
block_receipts.receipts.into_iter().flatten().collect(),
))),
);
}
}
};
this.update_cached_metrics();
}
}
}
}
}
enum CacheAction {
GetBlockWithSenders {
block_hash: B256,
response_tx: BlockWithSendersResponseSender,
},
GetEnv {
block_hash: B256,
response_tx: EnvResponseSender,
},
GetReceipts {
block_hash: B256,
response_tx: ReceiptsResponseSender,
},
BlockWithSendersResult {
block_hash: B256,
res: ProviderResult<Option<Arc<SealedBlockWithSenders>>>,
},
ReceiptsResult {
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Receipt>>>>,
},
EnvResult {
block_hash: B256,
res: Box<ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>>,
},
CacheNewCanonicalChain {
chain_change: ChainChange,
},
RemoveReorgedChain {
chain_change: ChainChange,
},
}
struct BlockReceipts {
block_hash: B256,
receipts: Vec<Option<Receipt>>,
}
struct ChainChange {
blocks: Vec<SealedBlockWithSenders>,
receipts: Vec<BlockReceipts>,
}
impl ChainChange {
fn new(chain: Arc<Chain>) -> Self {
let (blocks, receipts): (Vec<_>, Vec<_>) = chain
.blocks_and_receipts()
.map(|(block, receipts)| {
let block_receipts =
BlockReceipts { block_hash: block.block.hash(), receipts: receipts.clone() };
(block.clone(), block_receipts)
})
.unzip();
Self { blocks, receipts }
}
}
pub async fn cache_new_blocks_task<St>(eth_state_cache: EthStateCache, mut events: St)
where
St: Stream<Item = CanonStateNotification> + Unpin + 'static,
{
while let Some(event) = events.next().await {
if let Some(reverted) = event.reverted() {
let chain_change = ChainChange::new(reverted);
let _ =
eth_state_cache.to_service.send(CacheAction::RemoveReorgedChain { chain_change });
}
let chain_change = ChainChange::new(event.committed());
let _ =
eth_state_cache.to_service.send(CacheAction::CacheNewCanonicalChain { chain_change });
}
}