1use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
7 PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::future::TryFutureExt;
11use jsonrpsee::{core::RpcResult, server::IdProvider};
12use reth_errors::ProviderError;
13use reth_rpc_eth_api::{
14 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcNodeCoreExt,
15 RpcTransaction, TransactionCompat,
16};
17use reth_rpc_eth_types::{
18 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
19 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
20};
21use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
22use reth_storage_api::{
23 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
24 ProviderReceipt,
25};
26use reth_tasks::TaskSpawner;
27use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
28use std::{
29 collections::HashMap,
30 fmt,
31 future::Future,
32 iter::StepBy,
33 ops::RangeInclusive,
34 sync::Arc,
35 time::{Duration, Instant},
36};
37use tokio::{
38 sync::{mpsc::Receiver, Mutex},
39 time::MissedTickBehavior,
40};
41use tracing::{error, trace};
42
43impl<Eth> EngineEthFilter for EthFilter<Eth>
44where
45 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
46{
47 fn logs(
49 &self,
50 filter: Filter,
51 limits: QueryLimits,
52 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
53 trace!(target: "rpc::eth", "Serving eth_getLogs");
54 self.inner.logs_for_filter(filter, limits).map_err(|e| e.into())
55 }
56}
57
58const MAX_HEADERS_RANGE: u64 = 1_000; pub struct EthFilter<Eth: EthApiTypes> {
65 inner: Arc<EthFilterInner<Eth>>,
67}
68
69impl<Eth> Clone for EthFilter<Eth>
70where
71 Eth: EthApiTypes,
72{
73 fn clone(&self) -> Self {
74 Self { inner: self.inner.clone() }
75 }
76}
77
78impl<Eth> EthFilter<Eth>
79where
80 Eth: EthApiTypes + 'static,
81{
82 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
110 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
111 config;
112 let inner = EthFilterInner {
113 eth_api,
114 active_filters: ActiveFilters::new(),
115 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
116 max_headers_range: MAX_HEADERS_RANGE,
117 task_spawner,
118 stale_filter_ttl,
119 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
120 };
121
122 let eth_filter = Self { inner: Arc::new(inner) };
123
124 let this = eth_filter.clone();
125 eth_filter.inner.task_spawner.spawn_critical(
126 "eth-filters_stale-filters-clean",
127 Box::pin(async move {
128 this.watch_and_clear_stale_filters().await;
129 }),
130 );
131
132 eth_filter
133 }
134
135 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
137 &self.inner.active_filters
138 }
139
140 async fn watch_and_clear_stale_filters(&self) {
143 let mut interval = tokio::time::interval_at(
144 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
145 self.inner.stale_filter_ttl,
146 );
147 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
148 loop {
149 interval.tick().await;
150 self.clear_stale_filters(Instant::now()).await;
151 }
152 }
153
154 pub async fn clear_stale_filters(&self, now: Instant) {
157 trace!(target: "rpc::eth", "clear stale filters");
158 self.active_filters().inner.lock().await.retain(|id, filter| {
159 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
160
161 if !is_valid {
162 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
163 }
164
165 is_valid
166 })
167 }
168}
169
170impl<Eth> EthFilter<Eth>
171where
172 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
173{
174 fn provider(&self) -> &Eth::Provider {
176 self.inner.eth_api.provider()
177 }
178
179 fn pool(&self) -> &Eth::Pool {
181 self.inner.eth_api.pool()
182 }
183
184 pub async fn filter_changes(
186 &self,
187 id: FilterId,
188 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
189 let info = self.provider().chain_info()?;
190 let best_number = info.best_number;
191
192 let (start_block, kind) = {
195 let mut filters = self.inner.active_filters.inner.lock().await;
196 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
197
198 if filter.block > best_number {
199 return Ok(FilterChanges::Empty)
201 }
202
203 let mut block = best_number + 1;
207 std::mem::swap(&mut filter.block, &mut block);
208 filter.last_poll_timestamp = Instant::now();
209
210 (block, filter.kind.clone())
211 };
212
213 match kind {
214 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
215 FilterKind::Block => {
216 let end_block = best_number + 1;
219 let block_hashes =
220 self.provider().canonical_hashes_range(start_block, end_block).map_err(
221 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
222 )?;
223 Ok(FilterChanges::Hashes(block_hashes))
224 }
225 FilterKind::Log(filter) => {
226 let (from_block_number, to_block_number) = match filter.block_option {
227 FilterBlockOption::Range { from_block, to_block } => {
228 let from = from_block
229 .map(|num| self.provider().convert_block_number(num))
230 .transpose()?
231 .flatten();
232 let to = to_block
233 .map(|num| self.provider().convert_block_number(num))
234 .transpose()?
235 .flatten();
236 logs_utils::get_filter_block_range(from, to, start_block, info)
237 }
238 FilterBlockOption::AtBlockHash(_) => {
239 (start_block, best_number)
243 }
244 };
245 let logs = self
246 .inner
247 .get_logs_in_block_range(
248 &filter,
249 from_block_number,
250 to_block_number,
251 self.inner.query_limits,
252 )
253 .await?;
254 Ok(FilterChanges::Logs(logs))
255 }
256 }
257 }
258
259 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
265 let filter = {
266 let filters = self.inner.active_filters.inner.lock().await;
267 if let FilterKind::Log(ref filter) =
268 filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
269 {
270 *filter.clone()
271 } else {
272 return Err(EthFilterError::FilterNotFound(id))
274 }
275 };
276
277 self.inner.logs_for_filter(filter, self.inner.query_limits).await
278 }
279}
280
281#[async_trait]
282impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
283where
284 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
285{
286 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
288 trace!(target: "rpc::eth", "Serving eth_newFilter");
289 self.inner
290 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
291 .await
292 }
293
294 async fn new_block_filter(&self) -> RpcResult<FilterId> {
296 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
297 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
298 }
299
300 async fn new_pending_transaction_filter(
302 &self,
303 kind: Option<PendingTransactionFilterKind>,
304 ) -> RpcResult<FilterId> {
305 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
306
307 let transaction_kind = match kind.unwrap_or_default() {
308 PendingTransactionFilterKind::Hashes => {
309 let receiver = self.pool().pending_transactions_listener();
310 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
311 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
312 }
313 PendingTransactionFilterKind::Full => {
314 let stream = self.pool().new_pending_pool_transactions_listener();
315 let full_txs_receiver = FullTransactionsReceiver::new(
316 stream,
317 self.inner.eth_api.tx_resp_builder().clone(),
318 );
319 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
320 full_txs_receiver,
321 )))
322 }
323 };
324
325 self.inner.install_filter(transaction_kind).await
329 }
330
331 async fn filter_changes(
333 &self,
334 id: FilterId,
335 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
336 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
337 Ok(Self::filter_changes(self, id).await?)
338 }
339
340 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
346 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
347 Ok(Self::filter_logs(self, id).await?)
348 }
349
350 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
352 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
353 let mut filters = self.inner.active_filters.inner.lock().await;
354 if filters.remove(&id).is_some() {
355 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
356 Ok(true)
357 } else {
358 Ok(false)
359 }
360 }
361
362 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
366 trace!(target: "rpc::eth", "Serving eth_getLogs");
367 Ok(self.inner.logs_for_filter(filter, self.inner.query_limits).await?)
368 }
369}
370
371impl<Eth> std::fmt::Debug for EthFilter<Eth>
372where
373 Eth: EthApiTypes,
374{
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 f.debug_struct("EthFilter").finish_non_exhaustive()
377 }
378}
379
380#[derive(Debug)]
382struct EthFilterInner<Eth: EthApiTypes> {
383 eth_api: Eth,
385 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
387 id_provider: Arc<dyn IdProvider>,
389 query_limits: QueryLimits,
391 max_headers_range: u64,
393 task_spawner: Box<dyn TaskSpawner>,
395 stale_filter_ttl: Duration,
397}
398
399impl<Eth> EthFilterInner<Eth>
400where
401 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
402{
403 fn provider(&self) -> &Eth::Provider {
405 self.eth_api.provider()
406 }
407
408 fn eth_cache(
410 &self,
411 ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
412 self.eth_api.cache()
413 }
414
415 async fn logs_for_filter(
417 &self,
418 filter: Filter,
419 limits: QueryLimits,
420 ) -> Result<Vec<Log>, EthFilterError> {
421 match filter.block_option {
422 FilterBlockOption::AtBlockHash(block_hash) => {
423 let header = self
426 .provider()
427 .header_by_hash_or_number(block_hash.into())?
428 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
429
430 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
431
432 let (receipts, maybe_block) = self
435 .eth_cache()
436 .get_receipts_and_maybe_block(block_num_hash.hash)
437 .await?
438 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
439
440 let mut all_logs = Vec::new();
441 append_matching_block_logs(
442 &mut all_logs,
443 maybe_block
444 .map(ProviderOrBlock::Block)
445 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
446 &FilteredParams::new(Some(filter)),
447 block_num_hash,
448 &receipts,
449 false,
450 header.timestamp(),
451 )?;
452
453 Ok(all_logs)
454 }
455 FilterBlockOption::Range { from_block, to_block } => {
456 let info = self.provider().chain_info()?;
458
459 let start_block = info.best_number;
461 let from = from_block
462 .map(|num| self.provider().convert_block_number(num))
463 .transpose()?
464 .flatten();
465 let to = to_block
466 .map(|num| self.provider().convert_block_number(num))
467 .transpose()?
468 .flatten();
469 let (from_block_number, to_block_number) =
470 logs_utils::get_filter_block_range(from, to, start_block, info);
471 self.get_logs_in_block_range(&filter, from_block_number, to_block_number, limits)
472 .await
473 }
474 }
475 }
476
477 async fn install_filter(
479 &self,
480 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
481 ) -> RpcResult<FilterId> {
482 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
483 let id = FilterId::from(self.id_provider.next_id());
484 let mut filters = self.active_filters.inner.lock().await;
485 filters.insert(
486 id.clone(),
487 ActiveFilter {
488 block: last_poll_block_number,
489 last_poll_timestamp: Instant::now(),
490 kind,
491 },
492 );
493 Ok(id)
494 }
495
496 async fn get_logs_in_block_range(
502 &self,
503 filter: &Filter,
504 from_block: u64,
505 to_block: u64,
506 limits: QueryLimits,
507 ) -> Result<Vec<Log>, EthFilterError> {
508 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
509
510 if to_block < from_block {
511 return Err(EthFilterError::InvalidBlockRangeParams)
512 }
513
514 if let Some(max_blocks_per_filter) =
515 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
516 {
517 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
518 }
519
520 let mut all_logs = Vec::new();
521 let filter_params = FilteredParams::new(Some(filter.clone()));
522
523 let address_filter = FilteredParams::address_filter(&filter.address);
525 let topics_filter = FilteredParams::topics_filter(&filter.topics);
526
527 for (from, to) in
530 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
531 {
532 let headers = self.provider().headers_range(from..=to)?;
533
534 for (idx, header) in headers.iter().enumerate() {
535 if FilteredParams::matches_address(header.logs_bloom(), &address_filter) &&
537 FilteredParams::matches_topics(header.logs_bloom(), &topics_filter)
538 {
539 let block_hash = match headers.get(idx + 1) {
542 Some(parent) => parent.parent_hash(),
543 None => self
544 .provider()
545 .block_hash(header.number())?
546 .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
547 };
548
549 let num_hash = BlockNumHash::new(header.number(), block_hash);
550 if let Some((receipts, maybe_block)) =
551 self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
552 {
553 append_matching_block_logs(
554 &mut all_logs,
555 maybe_block
556 .map(ProviderOrBlock::Block)
557 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
558 &filter_params,
559 num_hash,
560 &receipts,
561 false,
562 header.timestamp(),
563 )?;
564
565 let is_multi_block_range = from_block != to_block;
568 if let Some(max_logs_per_response) = limits.max_logs_per_response {
569 if is_multi_block_range && all_logs.len() > max_logs_per_response {
570 return Err(EthFilterError::QueryExceedsMaxResults {
571 max_logs: max_logs_per_response,
572 from_block,
573 to_block: num_hash.number.saturating_sub(1),
574 });
575 }
576 }
577 }
578 }
579 }
580 }
581
582 Ok(all_logs)
583 }
584}
585
586#[derive(Debug, Clone, Default)]
588pub struct ActiveFilters<T> {
589 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
590}
591
592impl<T> ActiveFilters<T> {
593 pub fn new() -> Self {
595 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
596 }
597}
598
599#[derive(Debug)]
601struct ActiveFilter<T> {
602 block: u64,
604 last_poll_timestamp: Instant,
606 kind: FilterKind<T>,
608}
609
610#[derive(Debug, Clone)]
612struct PendingTransactionsReceiver {
613 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
614}
615
616impl PendingTransactionsReceiver {
617 fn new(receiver: Receiver<TxHash>) -> Self {
618 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
619 }
620
621 async fn drain<T>(&self) -> FilterChanges<T> {
623 let mut pending_txs = Vec::new();
624 let mut prepared_stream = self.txs_receiver.lock().await;
625
626 while let Ok(tx_hash) = prepared_stream.try_recv() {
627 pending_txs.push(tx_hash);
628 }
629
630 FilterChanges::Hashes(pending_txs)
632 }
633}
634
635#[derive(Debug, Clone)]
637struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
638 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
639 tx_resp_builder: TxCompat,
640}
641
642impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
643where
644 T: PoolTransaction + 'static,
645 TxCompat: TransactionCompat<T::Consensus>,
646{
647 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
649 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
650 }
651
652 async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
654 let mut pending_txs = Vec::new();
655 let mut prepared_stream = self.txs_stream.lock().await;
656
657 while let Ok(tx) = prepared_stream.try_recv() {
658 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
659 Ok(tx) => pending_txs.push(tx),
660 Err(err) => {
661 error!(target: "rpc",
662 %err,
663 "Failed to fill txn with block context"
664 );
665 }
666 }
667 }
668 FilterChanges::Transactions(pending_txs)
669 }
670}
671
672#[async_trait]
674trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
675 async fn drain(&self) -> FilterChanges<T>;
676}
677
678#[async_trait]
679impl<T, TxCompat> FullTransactionsFilter<TxCompat::Transaction>
680 for FullTransactionsReceiver<T, TxCompat>
681where
682 T: PoolTransaction + 'static,
683 TxCompat: TransactionCompat<T::Consensus> + 'static,
684{
685 async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
686 Self::drain(self).await
687 }
688}
689
690#[derive(Debug, Clone)]
696enum PendingTransactionKind<T> {
697 Hashes(PendingTransactionsReceiver),
698 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
699}
700
701impl<T: 'static> PendingTransactionKind<T> {
702 async fn drain(&self) -> FilterChanges<T> {
703 match self {
704 Self::Hashes(receiver) => receiver.drain().await,
705 Self::FullTransaction(receiver) => receiver.drain().await,
706 }
707 }
708}
709
710#[derive(Clone, Debug)]
711enum FilterKind<T> {
712 Log(Box<Filter>),
713 Block,
714 PendingTransaction(PendingTransactionKind<T>),
715}
716
717#[derive(Debug)]
719struct BlockRangeInclusiveIter {
720 iter: StepBy<RangeInclusive<u64>>,
721 step: u64,
722 end: u64,
723}
724
725impl BlockRangeInclusiveIter {
726 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
727 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
728 }
729}
730
731impl Iterator for BlockRangeInclusiveIter {
732 type Item = (u64, u64);
733
734 fn next(&mut self) -> Option<Self::Item> {
735 let start = self.iter.next()?;
736 let end = (start + self.step).min(self.end);
737 if start > end {
738 return None
739 }
740 Some((start, end))
741 }
742}
743
744#[derive(Debug, thiserror::Error)]
746pub enum EthFilterError {
747 #[error("filter not found")]
749 FilterNotFound(FilterId),
750 #[error("invalid block range params")]
752 InvalidBlockRangeParams,
753 #[error("query exceeds max block range {0}")]
755 QueryExceedsMaxBlocks(u64),
756 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
758 QueryExceedsMaxResults {
759 max_logs: usize,
761 from_block: u64,
763 to_block: u64,
765 },
766 #[error(transparent)]
768 EthAPIError(#[from] EthApiError),
769 #[error("internal filter error")]
771 InternalError,
772}
773
774impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
775 fn from(err: EthFilterError) -> Self {
776 match err {
777 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
778 jsonrpsee::types::error::INVALID_PARAMS_CODE,
779 "filter not found",
780 ),
781 err @ EthFilterError::InternalError => {
782 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
783 }
784 EthFilterError::EthAPIError(err) => err.into(),
785 err @ (EthFilterError::InvalidBlockRangeParams |
786 EthFilterError::QueryExceedsMaxBlocks(_) |
787 EthFilterError::QueryExceedsMaxResults { .. }) => {
788 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
789 }
790 }
791 }
792}
793
794impl From<ProviderError> for EthFilterError {
795 fn from(err: ProviderError) -> Self {
796 Self::EthAPIError(err.into())
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803 use rand::Rng;
804 use reth_testing_utils::generators;
805
806 #[test]
807 fn test_block_range_iter() {
808 let mut rng = generators::rng();
809
810 let start = rng.random::<u32>() as u64;
811 let end = start.saturating_add(rng.random::<u32>() as u64);
812 let step = rng.random::<u16>() as u64;
813 let range = start..=end;
814 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
815 let (from, mut end) = iter.next().unwrap();
816 assert_eq!(from, start);
817 assert_eq!(end, (from + step).min(*range.end()));
818
819 for (next_from, next_end) in iter {
820 assert_eq!(next_from, end + 1);
822 end = next_end;
823 }
824
825 assert_eq!(end, *range.end());
826 }
827}