use alloy_consensus::BlockHeader;
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
PendingTransactionFilterKind,
};
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_chainspec::ChainInfo;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
ProviderError, ProviderReceipt,
};
use reth_rpc_eth_api::{
EthApiTypes, EthFilterApiServer, FullEthApiTypes, RpcNodeCoreExt, RpcTransaction,
TransactionCompat,
};
use reth_rpc_eth_types::{
logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
};
use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
use reth_rpc_types_compat::transaction::from_recovered;
use reth_tasks::TaskSpawner;
use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
use std::{
collections::HashMap,
fmt,
iter::StepBy,
ops::RangeInclusive,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
sync::{mpsc::Receiver, Mutex},
time::MissedTickBehavior,
};
use tracing::{error, trace};
const MAX_HEADERS_RANGE: u64 = 1_000; pub struct EthFilter<Eth: EthApiTypes> {
inner: Arc<EthFilterInner<Eth>>,
}
impl<Eth> Clone for EthFilter<Eth>
where
Eth: EthApiTypes,
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}
impl<Eth> EthFilter<Eth>
where
Eth: EthApiTypes + 'static,
{
pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
let inner = EthFilterInner {
eth_api,
active_filters: ActiveFilters::new(),
id_provider: Arc::new(EthSubscriptionIdProvider::default()),
max_headers_range: MAX_HEADERS_RANGE,
task_spawner,
stale_filter_ttl,
max_blocks_per_filter: max_blocks_per_filter.unwrap_or(u64::MAX),
max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX),
};
let eth_filter = Self { inner: Arc::new(inner) };
let this = eth_filter.clone();
eth_filter.inner.task_spawner.spawn_critical(
"eth-filters_stale-filters-clean",
Box::pin(async move {
this.watch_and_clear_stale_filters().await;
}),
);
eth_filter
}
pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
&self.inner.active_filters
}
async fn watch_and_clear_stale_filters(&self) {
let mut interval = tokio::time::interval_at(
tokio::time::Instant::now() + self.inner.stale_filter_ttl,
self.inner.stale_filter_ttl,
);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
interval.tick().await;
self.clear_stale_filters(Instant::now()).await;
}
}
pub async fn clear_stale_filters(&self, now: Instant) {
trace!(target: "rpc::eth", "clear stale filters");
self.active_filters().inner.lock().await.retain(|id, filter| {
let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
if !is_valid {
trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
}
is_valid
})
}
}
impl<Eth> EthFilter<Eth>
where
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
{
fn provider(&self) -> &Eth::Provider {
self.inner.eth_api.provider()
}
fn pool(&self) -> &Eth::Pool {
self.inner.eth_api.pool()
}
pub async fn filter_changes(
&self,
id: FilterId,
) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
let info = self.provider().chain_info()?;
let best_number = info.best_number;
let (start_block, kind) = {
let mut filters = self.inner.active_filters.inner.lock().await;
let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
if filter.block > best_number {
return Ok(FilterChanges::Empty)
}
let mut block = best_number + 1;
std::mem::swap(&mut filter.block, &mut block);
filter.last_poll_timestamp = Instant::now();
(block, filter.kind.clone())
};
match kind {
FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
FilterKind::Block => {
let end_block = best_number + 1;
let block_hashes =
self.provider().canonical_hashes_range(start_block, end_block).map_err(
|_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
)?;
Ok(FilterChanges::Hashes(block_hashes))
}
FilterKind::Log(filter) => {
let (from_block_number, to_block_number) = match filter.block_option {
FilterBlockOption::Range { from_block, to_block } => {
let from = from_block
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
let to = to_block
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
logs_utils::get_filter_block_range(from, to, start_block, info)
}
FilterBlockOption::AtBlockHash(_) => {
(start_block, best_number)
}
};
let logs = self
.inner
.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
.await?;
Ok(FilterChanges::Logs(logs))
}
}
}
pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
let filter = {
let filters = self.inner.active_filters.inner.lock().await;
if let FilterKind::Log(ref filter) =
filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
{
*filter.clone()
} else {
return Err(EthFilterError::FilterNotFound(id))
}
};
self.inner.logs_for_filter(filter).await
}
}
#[async_trait]
impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
where
Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
{
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newFilter");
self.inner
.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
.await
}
async fn new_block_filter(&self) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
}
async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
let transaction_kind = match kind.unwrap_or_default() {
PendingTransactionFilterKind::Hashes => {
let receiver = self.pool().pending_transactions_listener();
let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
}
PendingTransactionFilterKind::Full => {
let stream = self.pool().new_pending_pool_transactions_listener();
let full_txs_receiver = FullTransactionsReceiver::new(
stream,
self.inner.eth_api.tx_resp_builder().clone(),
);
FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
full_txs_receiver,
)))
}
};
self.inner.install_filter(transaction_kind).await
}
async fn filter_changes(
&self,
id: FilterId,
) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
Ok(Self::filter_changes(self, id).await?)
}
async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
Ok(Self::filter_logs(self, id).await?)
}
async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
let mut filters = self.inner.active_filters.inner.lock().await;
if filters.remove(&id).is_some() {
trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
Ok(true)
} else {
Ok(false)
}
}
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
Ok(self.inner.logs_for_filter(filter).await?)
}
}
impl<Eth> std::fmt::Debug for EthFilter<Eth>
where
Eth: EthApiTypes,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthFilter").finish_non_exhaustive()
}
}
#[derive(Debug)]
struct EthFilterInner<Eth: EthApiTypes> {
eth_api: Eth,
active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
id_provider: Arc<dyn IdProvider>,
max_blocks_per_filter: u64,
max_logs_per_response: usize,
max_headers_range: u64,
task_spawner: Box<dyn TaskSpawner>,
stale_filter_ttl: Duration,
}
impl<Eth> EthFilterInner<Eth>
where
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
{
fn provider(&self) -> &Eth::Provider {
self.eth_api.provider()
}
fn eth_cache(
&self,
) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
self.eth_api.cache()
}
async fn logs_for_filter(&self, filter: Filter) -> Result<Vec<Log>, EthFilterError> {
match filter.block_option {
FilterBlockOption::AtBlockHash(block_hash) => {
let header = self
.provider()
.header_by_hash_or_number(block_hash.into())?
.ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
let block_num_hash = BlockNumHash::new(header.number(), block_hash);
let (receipts, maybe_block) = self
.receipts_and_maybe_block(
&block_num_hash,
self.provider().chain_info()?.best_number,
)
.await?
.ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
let mut all_logs = Vec::new();
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
&FilteredParams::new(Some(filter)),
block_num_hash,
&receipts,
false,
header.timestamp(),
)?;
Ok(all_logs)
}
FilterBlockOption::Range { from_block, to_block } => {
let info = self.provider().chain_info()?;
let start_block = info.best_number;
let from = from_block
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
let to = to_block
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info);
self.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
.await
}
}
}
async fn install_filter(
&self,
kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
) -> RpcResult<FilterId> {
let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
let id = FilterId::from(self.id_provider.next_id());
let mut filters = self.active_filters.inner.lock().await;
filters.insert(
id.clone(),
ActiveFilter {
block: last_poll_block_number,
last_poll_timestamp: Instant::now(),
kind,
},
);
Ok(id)
}
async fn get_logs_in_block_range(
&self,
filter: &Filter,
from_block: u64,
to_block: u64,
chain_info: ChainInfo,
) -> Result<Vec<Log>, EthFilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
if to_block < from_block {
return Err(EthFilterError::InvalidBlockRangeParams)
}
if to_block - from_block > self.max_blocks_per_filter {
return Err(EthFilterError::QueryExceedsMaxBlocks(self.max_blocks_per_filter))
}
let mut all_logs = Vec::new();
let filter_params = FilteredParams::new(Some(filter.clone()));
let address_filter = FilteredParams::address_filter(&filter.address);
let topics_filter = FilteredParams::topics_filter(&filter.topics);
for (from, to) in
BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
{
let headers = self.provider().headers_range(from..=to)?;
for (idx, header) in headers.iter().enumerate() {
if FilteredParams::matches_address(header.logs_bloom(), &address_filter) &&
FilteredParams::matches_topics(header.logs_bloom(), &topics_filter)
{
let block_hash = match headers.get(idx + 1) {
Some(parent) => parent.parent_hash(),
None => self
.provider()
.block_hash(header.number())?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
};
let num_hash = BlockNumHash::new(header.number(), block_hash);
if let Some((receipts, maybe_block)) =
self.receipts_and_maybe_block(&num_hash, chain_info.best_number).await?
{
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
&filter_params,
num_hash,
&receipts,
false,
header.timestamp(),
)?;
let is_multi_block_range = from_block != to_block;
if is_multi_block_range && all_logs.len() > self.max_logs_per_response {
return Err(EthFilterError::QueryExceedsMaxResults {
max_logs: self.max_logs_per_response,
from_block,
to_block: num_hash.number.saturating_sub(1),
});
}
}
}
}
}
Ok(all_logs)
}
async fn receipts_and_maybe_block(
&self,
block_num_hash: &BlockNumHash,
best_number: u64,
) -> Result<
Option<(
Arc<Vec<ProviderReceipt<Eth::Provider>>>,
Option<Arc<SealedBlockWithSenders<ProviderBlock<Eth::Provider>>>>,
)>,
EthFilterError,
> {
let cached_range = best_number.saturating_sub(4)..=best_number;
let receipts_block = if cached_range.contains(&block_num_hash.number) {
self.eth_cache()
.get_block_and_receipts(block_num_hash.hash)
.await?
.map(|(b, r)| (r, Some(b)))
} else {
self.eth_cache().get_receipts(block_num_hash.hash).await?.map(|r| (r, None))
};
Ok(receipts_block)
}
}
#[derive(Debug, Clone, Default)]
pub struct ActiveFilters<T> {
inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
}
impl<T> ActiveFilters<T> {
pub fn new() -> Self {
Self { inner: Arc::new(Mutex::new(HashMap::default())) }
}
}
#[derive(Debug)]
struct ActiveFilter<T> {
block: u64,
last_poll_timestamp: Instant,
kind: FilterKind<T>,
}
#[derive(Debug, Clone)]
struct PendingTransactionsReceiver {
txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
}
impl PendingTransactionsReceiver {
fn new(receiver: Receiver<TxHash>) -> Self {
Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
}
async fn drain<T>(&self) -> FilterChanges<T> {
let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_receiver.lock().await;
while let Ok(tx_hash) = prepared_stream.try_recv() {
pending_txs.push(tx_hash);
}
FilterChanges::Hashes(pending_txs)
}
}
#[derive(Debug, Clone)]
struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
tx_resp_builder: TxCompat,
}
impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
where
T: PoolTransaction + 'static,
TxCompat: TransactionCompat<T::Consensus>,
{
fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
}
async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_stream.lock().await;
while let Ok(tx) = prepared_stream.try_recv() {
match from_recovered(tx.transaction.to_consensus(), &self.tx_resp_builder) {
Ok(tx) => pending_txs.push(tx),
Err(err) => {
error!(target: "rpc",
%err,
"Failed to fill txn with block context"
);
}
}
}
FilterChanges::Transactions(pending_txs)
}
}
#[async_trait]
trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
async fn drain(&self) -> FilterChanges<T>;
}
#[async_trait]
impl<T, TxCompat> FullTransactionsFilter<TxCompat::Transaction>
for FullTransactionsReceiver<T, TxCompat>
where
T: PoolTransaction + 'static,
TxCompat: TransactionCompat<T::Consensus> + 'static,
{
async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
Self::drain(self).await
}
}
#[derive(Debug, Clone)]
enum PendingTransactionKind<T> {
Hashes(PendingTransactionsReceiver),
FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
}
impl<T: 'static> PendingTransactionKind<T> {
async fn drain(&self) -> FilterChanges<T> {
match self {
Self::Hashes(receiver) => receiver.drain().await,
Self::FullTransaction(receiver) => receiver.drain().await,
}
}
}
#[derive(Clone, Debug)]
enum FilterKind<T> {
Log(Box<Filter>),
Block,
PendingTransaction(PendingTransactionKind<T>),
}
#[derive(Debug)]
struct BlockRangeInclusiveIter {
iter: StepBy<RangeInclusive<u64>>,
step: u64,
end: u64,
}
impl BlockRangeInclusiveIter {
fn new(range: RangeInclusive<u64>, step: u64) -> Self {
Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
}
}
impl Iterator for BlockRangeInclusiveIter {
type Item = (u64, u64);
fn next(&mut self) -> Option<Self::Item> {
let start = self.iter.next()?;
let end = (start + self.step).min(self.end);
if start > end {
return None
}
Some((start, end))
}
}
#[derive(Debug, thiserror::Error)]
pub enum EthFilterError {
#[error("filter not found")]
FilterNotFound(FilterId),
#[error("invalid block range params")]
InvalidBlockRangeParams,
#[error("query exceeds max block range {0}")]
QueryExceedsMaxBlocks(u64),
#[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
QueryExceedsMaxResults {
max_logs: usize,
from_block: u64,
to_block: u64,
},
#[error(transparent)]
EthAPIError(#[from] EthApiError),
#[error("internal filter error")]
InternalError,
}
impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
fn from(err: EthFilterError) -> Self {
match err {
EthFilterError::FilterNotFound(_) => rpc_error_with_code(
jsonrpsee::types::error::INVALID_PARAMS_CODE,
"filter not found",
),
err @ EthFilterError::InternalError => {
rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
}
EthFilterError::EthAPIError(err) => err.into(),
err @ (EthFilterError::InvalidBlockRangeParams |
EthFilterError::QueryExceedsMaxBlocks(_) |
EthFilterError::QueryExceedsMaxResults { .. }) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
}
}
}
}
impl From<ProviderError> for EthFilterError {
fn from(err: ProviderError) -> Self {
Self::EthAPIError(err.into())
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;
use reth_testing_utils::generators;
#[test]
fn test_block_range_iter() {
let mut rng = generators::rng();
let start = rng.gen::<u32>() as u64;
let end = start.saturating_add(rng.gen::<u32>() as u64);
let step = rng.gen::<u16>() as u64;
let range = start..=end;
let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
let (from, mut end) = iter.next().unwrap();
assert_eq!(from, start);
assert_eq!(end, (from + step).min(*range.end()));
for (next_from, next_end) in iter {
assert_eq!(next_from, end + 1);
end = next_end;
}
assert_eq!(end, *range.end());
}
}