reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::future::TryFutureExt;
11use jsonrpsee::{core::RpcResult, server::IdProvider};
12use reth_errors::ProviderError;
13use reth_primitives_traits::NodePrimitives;
14use reth_rpc_eth_api::{
15    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcNodeCore,
16    RpcNodeCoreExt, RpcTransaction, TransactionCompat,
17};
18use reth_rpc_eth_types::{
19    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
20    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
21};
22use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
23use reth_storage_api::{
24    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
25    ProviderReceipt, TransactionsProvider,
26};
27use reth_tasks::TaskSpawner;
28use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
29use std::{
30    collections::HashMap,
31    fmt,
32    future::Future,
33    iter::StepBy,
34    ops::RangeInclusive,
35    sync::Arc,
36    time::{Duration, Instant},
37};
38use tokio::{
39    sync::{mpsc::Receiver, oneshot, Mutex},
40    time::MissedTickBehavior,
41};
42use tracing::{error, trace};
43
44impl<Eth> EngineEthFilter for EthFilter<Eth>
45where
46    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
47{
48    /// Returns logs matching given filter object, no query limits
49    fn logs(
50        &self,
51        filter: Filter,
52        limits: QueryLimits,
53    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
54        trace!(target: "rpc::eth", "Serving eth_getLogs");
55        self.logs_for_filter(filter, limits).map_err(|e| e.into())
56    }
57}
58
59/// The maximum number of headers we read at once when handling a range filter.
60const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
61
62/// `Eth` filter RPC implementation.
63///
64/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
65pub struct EthFilter<Eth: EthApiTypes> {
66    /// All nested fields bundled together
67    inner: Arc<EthFilterInner<Eth>>,
68}
69
70impl<Eth> Clone for EthFilter<Eth>
71where
72    Eth: EthApiTypes,
73{
74    fn clone(&self) -> Self {
75        Self { inner: self.inner.clone() }
76    }
77}
78
79impl<Eth> EthFilter<Eth>
80where
81    Eth: EthApiTypes + 'static,
82{
83    /// Creates a new, shareable instance.
84    ///
85    /// This uses the given pool to get notified about new transactions, the provider to interact
86    /// with the blockchain, the cache to fetch cacheable data, like the logs.
87    ///
88    /// See also [`EthFilterConfig`].
89    ///
90    /// This also spawns a task that periodically clears stale filters.
91    ///
92    /// # Create a new instance with [`EthApi`](crate::EthApi)
93    ///
94    /// ```no_run
95    /// use reth_evm_ethereum::EthEvmConfig;
96    /// use reth_network_api::noop::NoopNetwork;
97    /// use reth_provider::noop::NoopProvider;
98    /// use reth_rpc::{EthApi, EthFilter};
99    /// use reth_tasks::TokioTaskExecutor;
100    /// use reth_transaction_pool::noop::NoopTransactionPool;
101    /// let eth_api = EthApi::builder(
102    ///     NoopProvider::default(),
103    ///     NoopTransactionPool::default(),
104    ///     NoopNetwork::default(),
105    ///     EthEvmConfig::mainnet(),
106    /// )
107    /// .build();
108    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
109    /// ```
110    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
111        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
112            config;
113        let inner = EthFilterInner {
114            eth_api,
115            active_filters: ActiveFilters::new(),
116            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
117            max_headers_range: MAX_HEADERS_RANGE,
118            task_spawner,
119            stale_filter_ttl,
120            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
121        };
122
123        let eth_filter = Self { inner: Arc::new(inner) };
124
125        let this = eth_filter.clone();
126        eth_filter.inner.task_spawner.spawn_critical(
127            "eth-filters_stale-filters-clean",
128            Box::pin(async move {
129                this.watch_and_clear_stale_filters().await;
130            }),
131        );
132
133        eth_filter
134    }
135
136    /// Returns all currently active filters
137    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
138        &self.inner.active_filters
139    }
140
141    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
142    /// Nonetheless, this endless future frees the thread at every await point.
143    async fn watch_and_clear_stale_filters(&self) {
144        let mut interval = tokio::time::interval_at(
145            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
146            self.inner.stale_filter_ttl,
147        );
148        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
149        loop {
150            interval.tick().await;
151            self.clear_stale_filters(Instant::now()).await;
152        }
153    }
154
155    /// Clears all filters that have not been polled for longer than the configured
156    /// `stale_filter_ttl` at the given instant.
157    pub async fn clear_stale_filters(&self, now: Instant) {
158        trace!(target: "rpc::eth", "clear stale filters");
159        self.active_filters().inner.lock().await.retain(|id, filter| {
160            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
161
162            if !is_valid {
163                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
164            }
165
166            is_valid
167        })
168    }
169}
170
171impl<Eth> EthFilter<Eth>
172where
173    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
174{
175    /// Access the underlying provider.
176    fn provider(&self) -> &Eth::Provider {
177        self.inner.eth_api.provider()
178    }
179
180    /// Access the underlying pool.
181    fn pool(&self) -> &Eth::Pool {
182        self.inner.eth_api.pool()
183    }
184
185    /// Returns all the filter changes for the given id, if any
186    pub async fn filter_changes(
187        &self,
188        id: FilterId,
189    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
190        let info = self.provider().chain_info()?;
191        let best_number = info.best_number;
192
193        // start_block is the block from which we should start fetching changes, the next block from
194        // the last time changes were polled, in other words the best block at last poll + 1
195        let (start_block, kind) = {
196            let mut filters = self.inner.active_filters.inner.lock().await;
197            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
198
199            if filter.block > best_number {
200                // no new blocks since the last poll
201                return Ok(FilterChanges::Empty)
202            }
203
204            // update filter
205            // we fetch all changes from [filter.block..best_block], so we advance the filter's
206            // block to `best_block +1`, the next from which we should start fetching changes again
207            let mut block = best_number + 1;
208            std::mem::swap(&mut filter.block, &mut block);
209            filter.last_poll_timestamp = Instant::now();
210
211            (block, filter.kind.clone())
212        };
213
214        match kind {
215            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
216            FilterKind::Block => {
217                // Note: we need to fetch the block hashes from inclusive range
218                // [start_block..best_block]
219                let end_block = best_number + 1;
220                let block_hashes =
221                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
222                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
223                    )?;
224                Ok(FilterChanges::Hashes(block_hashes))
225            }
226            FilterKind::Log(filter) => {
227                let (from_block_number, to_block_number) = match filter.block_option {
228                    FilterBlockOption::Range { from_block, to_block } => {
229                        let from = from_block
230                            .map(|num| self.provider().convert_block_number(num))
231                            .transpose()?
232                            .flatten();
233                        let to = to_block
234                            .map(|num| self.provider().convert_block_number(num))
235                            .transpose()?
236                            .flatten();
237                        logs_utils::get_filter_block_range(from, to, start_block, info)
238                    }
239                    FilterBlockOption::AtBlockHash(_) => {
240                        // blockHash is equivalent to fromBlock = toBlock = the block number with
241                        // hash blockHash
242                        // get_logs_in_block_range is inclusive
243                        (start_block, best_number)
244                    }
245                };
246                let logs = self
247                    .inner
248                    .clone()
249                    .get_logs_in_block_range(
250                        *filter,
251                        from_block_number,
252                        to_block_number,
253                        self.inner.query_limits,
254                    )
255                    .await?;
256                Ok(FilterChanges::Logs(logs))
257            }
258        }
259    }
260
261    /// Returns an array of all logs matching filter with given id.
262    ///
263    /// Returns an error if no matching log filter exists.
264    ///
265    /// Handler for `eth_getFilterLogs`
266    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
267        let filter = {
268            let filters = self.inner.active_filters.inner.lock().await;
269            if let FilterKind::Log(ref filter) =
270                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
271            {
272                *filter.clone()
273            } else {
274                // Not a log filter
275                return Err(EthFilterError::FilterNotFound(id))
276            }
277        };
278
279        self.logs_for_filter(filter, self.inner.query_limits).await
280    }
281
282    /// Returns logs matching given filter object.
283    async fn logs_for_filter(
284        &self,
285        filter: Filter,
286        limits: QueryLimits,
287    ) -> Result<Vec<Log>, EthFilterError> {
288        self.inner.clone().logs_for_filter(filter, limits).await
289    }
290}
291
292#[async_trait]
293impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
294where
295    Eth: FullEthApiTypes
296        + RpcNodeCoreExt<
297            Provider: BlockIdReader,
298            Primitives: NodePrimitives<
299                SignedTx = <<Eth as RpcNodeCore>::Provider as TransactionsProvider>::Transaction,
300            >,
301        > + 'static,
302{
303    /// Handler for `eth_newFilter`
304    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
305        trace!(target: "rpc::eth", "Serving eth_newFilter");
306        self.inner
307            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
308            .await
309    }
310
311    /// Handler for `eth_newBlockFilter`
312    async fn new_block_filter(&self) -> RpcResult<FilterId> {
313        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
314        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
315    }
316
317    /// Handler for `eth_newPendingTransactionFilter`
318    async fn new_pending_transaction_filter(
319        &self,
320        kind: Option<PendingTransactionFilterKind>,
321    ) -> RpcResult<FilterId> {
322        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
323
324        let transaction_kind = match kind.unwrap_or_default() {
325            PendingTransactionFilterKind::Hashes => {
326                let receiver = self.pool().pending_transactions_listener();
327                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
328                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
329            }
330            PendingTransactionFilterKind::Full => {
331                let stream = self.pool().new_pending_pool_transactions_listener();
332                let full_txs_receiver = FullTransactionsReceiver::new(
333                    stream,
334                    self.inner.eth_api.tx_resp_builder().clone(),
335                );
336                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
337                    full_txs_receiver,
338                )))
339            }
340        };
341
342        //let filter = FilterKind::PendingTransaction(transaction_kind);
343
344        // Install the filter and propagate any errors
345        self.inner.install_filter(transaction_kind).await
346    }
347
348    /// Handler for `eth_getFilterChanges`
349    async fn filter_changes(
350        &self,
351        id: FilterId,
352    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
353        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
354        Ok(Self::filter_changes(self, id).await?)
355    }
356
357    /// Returns an array of all logs matching filter with given id.
358    ///
359    /// Returns an error if no matching log filter exists.
360    ///
361    /// Handler for `eth_getFilterLogs`
362    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
363        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
364        Ok(Self::filter_logs(self, id).await?)
365    }
366
367    /// Handler for `eth_uninstallFilter`
368    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
369        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
370        let mut filters = self.inner.active_filters.inner.lock().await;
371        if filters.remove(&id).is_some() {
372            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
373            Ok(true)
374        } else {
375            Ok(false)
376        }
377    }
378
379    /// Returns logs matching given filter object.
380    ///
381    /// Handler for `eth_getLogs`
382    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
383        trace!(target: "rpc::eth", "Serving eth_getLogs");
384        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
385    }
386}
387
388impl<Eth> std::fmt::Debug for EthFilter<Eth>
389where
390    Eth: EthApiTypes,
391{
392    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393        f.debug_struct("EthFilter").finish_non_exhaustive()
394    }
395}
396
397/// Container type `EthFilter`
398#[derive(Debug)]
399struct EthFilterInner<Eth: EthApiTypes> {
400    /// Inner `eth` API implementation.
401    eth_api: Eth,
402    /// All currently installed filters.
403    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
404    /// Provides ids to identify filters
405    id_provider: Arc<dyn IdProvider>,
406    /// limits for logs queries
407    query_limits: QueryLimits,
408    /// maximum number of headers to read at once for range filter
409    max_headers_range: u64,
410    /// The type that can spawn tasks.
411    task_spawner: Box<dyn TaskSpawner>,
412    /// Duration since the last filter poll, after which the filter is considered stale
413    stale_filter_ttl: Duration,
414}
415
416impl<Eth> EthFilterInner<Eth>
417where
418    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
419        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
420        + 'static,
421{
422    /// Access the underlying provider.
423    fn provider(&self) -> &Eth::Provider {
424        self.eth_api.provider()
425    }
426
427    /// Access the underlying [`EthStateCache`].
428    fn eth_cache(
429        &self,
430    ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
431        self.eth_api.cache()
432    }
433
434    /// Returns logs matching given filter object.
435    async fn logs_for_filter(
436        self: Arc<Self>,
437        filter: Filter,
438        limits: QueryLimits,
439    ) -> Result<Vec<Log>, EthFilterError> {
440        match filter.block_option {
441            FilterBlockOption::AtBlockHash(block_hash) => {
442                // for all matching logs in the block
443                // get the block header with the hash
444                let header = self
445                    .provider()
446                    .header_by_hash_or_number(block_hash.into())?
447                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
448
449                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
450
451                // we also need to ensure that the receipts are available and return an error if
452                // not, in case the block hash been reorged
453                let (receipts, maybe_block) = self
454                    .eth_cache()
455                    .get_receipts_and_maybe_block(block_num_hash.hash)
456                    .await?
457                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
458
459                let mut all_logs = Vec::new();
460                append_matching_block_logs(
461                    &mut all_logs,
462                    maybe_block
463                        .map(ProviderOrBlock::Block)
464                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
465                    &filter,
466                    block_num_hash,
467                    &receipts,
468                    false,
469                    header.timestamp(),
470                )?;
471
472                Ok(all_logs)
473            }
474            FilterBlockOption::Range { from_block, to_block } => {
475                // compute the range
476                let info = self.provider().chain_info()?;
477
478                // we start at the most recent block if unset in filter
479                let start_block = info.best_number;
480                let from = from_block
481                    .map(|num| self.provider().convert_block_number(num))
482                    .transpose()?
483                    .flatten();
484                let to = to_block
485                    .map(|num| self.provider().convert_block_number(num))
486                    .transpose()?
487                    .flatten();
488                let (from_block_number, to_block_number) =
489                    logs_utils::get_filter_block_range(from, to, start_block, info);
490                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
491                    .await
492            }
493        }
494    }
495
496    /// Installs a new filter and returns the new identifier.
497    async fn install_filter(
498        &self,
499        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
500    ) -> RpcResult<FilterId> {
501        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
502        let subscription_id = self.id_provider.next_id();
503
504        let id = match subscription_id {
505            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
506            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
507        };
508        let mut filters = self.active_filters.inner.lock().await;
509        filters.insert(
510            id.clone(),
511            ActiveFilter {
512                block: last_poll_block_number,
513                last_poll_timestamp: Instant::now(),
514                kind,
515            },
516        );
517        Ok(id)
518    }
519
520    /// Returns all logs in the given _inclusive_ range that match the filter
521    ///
522    /// Returns an error if:
523    ///  - underlying database error
524    ///  - amount of matches exceeds configured limit
525    async fn get_logs_in_block_range(
526        self: Arc<Self>,
527        filter: Filter,
528        from_block: u64,
529        to_block: u64,
530        limits: QueryLimits,
531    ) -> Result<Vec<Log>, EthFilterError> {
532        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
533
534        // perform boundary checks first
535        if to_block < from_block {
536            return Err(EthFilterError::InvalidBlockRangeParams)
537        }
538
539        if let Some(max_blocks_per_filter) =
540            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
541        {
542            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
543        }
544
545        let (tx, rx) = oneshot::channel();
546        let this = self.clone();
547        self.task_spawner.spawn_blocking(Box::pin(async move {
548            let res =
549                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
550            let _ = tx.send(res);
551        }));
552
553        rx.await.map_err(|_| EthFilterError::InternalError)?
554    }
555
556    /// Returns all logs in the given _inclusive_ range that match the filter
557    ///
558    /// Note: This function uses a mix of blocking db operations for fetching indices and header
559    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
560    /// This function is considered blocking and should thus be spawned on a blocking task.
561    ///
562    /// Returns an error if:
563    ///  - underlying database error
564    async fn get_logs_in_block_range_inner(
565        &self,
566        filter: &Filter,
567        from_block: u64,
568        to_block: u64,
569        limits: QueryLimits,
570    ) -> Result<Vec<Log>, EthFilterError> {
571        let mut all_logs = Vec::new();
572
573        // loop over the range of new blocks and check logs if the filter matches the log's bloom
574        // filter
575        for (from, to) in
576            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
577        {
578            let headers = self.provider().headers_range(from..=to)?;
579            for (idx, header) in headers
580                .iter()
581                .enumerate()
582                .filter(|(_, header)| filter.matches_bloom(header.logs_bloom()))
583            {
584                // these are consecutive headers, so we can use the parent hash of the next
585                // block to get the current header's hash
586                let block_hash = match headers.get(idx + 1) {
587                    Some(child) => child.parent_hash(),
588                    None => self
589                        .provider()
590                        .block_hash(header.number())?
591                        .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
592                };
593
594                let num_hash = BlockNumHash::new(header.number(), block_hash);
595                if let Some((receipts, maybe_block)) =
596                    self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
597                {
598                    append_matching_block_logs(
599                        &mut all_logs,
600                        maybe_block
601                            .map(ProviderOrBlock::Block)
602                            .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
603                        filter,
604                        num_hash,
605                        &receipts,
606                        false,
607                        header.timestamp(),
608                    )?;
609
610                    // size check but only if range is multiple blocks, so we always return all
611                    // logs of a single block
612                    let is_multi_block_range = from_block != to_block;
613                    if let Some(max_logs_per_response) = limits.max_logs_per_response {
614                        if is_multi_block_range && all_logs.len() > max_logs_per_response {
615                            return Err(EthFilterError::QueryExceedsMaxResults {
616                                max_logs: max_logs_per_response,
617                                from_block,
618                                to_block: num_hash.number.saturating_sub(1),
619                            });
620                        }
621                    }
622                }
623            }
624        }
625
626        Ok(all_logs)
627    }
628}
629
630/// All active filters
631#[derive(Debug, Clone, Default)]
632pub struct ActiveFilters<T> {
633    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
634}
635
636impl<T> ActiveFilters<T> {
637    /// Returns an empty instance.
638    pub fn new() -> Self {
639        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
640    }
641}
642
643/// An installed filter
644#[derive(Debug)]
645struct ActiveFilter<T> {
646    /// At which block the filter was polled last.
647    block: u64,
648    /// Last time this filter was polled.
649    last_poll_timestamp: Instant,
650    /// What kind of filter it is.
651    kind: FilterKind<T>,
652}
653
654/// A receiver for pending transactions that returns all new transactions since the last poll.
655#[derive(Debug, Clone)]
656struct PendingTransactionsReceiver {
657    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
658}
659
660impl PendingTransactionsReceiver {
661    fn new(receiver: Receiver<TxHash>) -> Self {
662        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
663    }
664
665    /// Returns all new pending transactions received since the last poll.
666    async fn drain<T>(&self) -> FilterChanges<T> {
667        let mut pending_txs = Vec::new();
668        let mut prepared_stream = self.txs_receiver.lock().await;
669
670        while let Ok(tx_hash) = prepared_stream.try_recv() {
671            pending_txs.push(tx_hash);
672        }
673
674        // Convert the vector of hashes into FilterChanges::Hashes
675        FilterChanges::Hashes(pending_txs)
676    }
677}
678
679/// A structure to manage and provide access to a stream of full transaction details.
680#[derive(Debug, Clone)]
681struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
682    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
683    tx_resp_builder: TxCompat,
684}
685
686impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
687where
688    T: PoolTransaction + 'static,
689    TxCompat: TransactionCompat<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
690{
691    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
692    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
693        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
694    }
695
696    /// Returns all new pending transactions received since the last poll.
697    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
698        let mut pending_txs = Vec::new();
699        let mut prepared_stream = self.txs_stream.lock().await;
700
701        while let Ok(tx) = prepared_stream.try_recv() {
702            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
703                Ok(tx) => pending_txs.push(tx),
704                Err(err) => {
705                    error!(target: "rpc",
706                        %err,
707                        "Failed to fill txn with block context"
708                    );
709                }
710            }
711        }
712        FilterChanges::Transactions(pending_txs)
713    }
714}
715
716/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
717#[async_trait]
718trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
719    async fn drain(&self) -> FilterChanges<T>;
720}
721
722#[async_trait]
723impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
724    for FullTransactionsReceiver<T, TxCompat>
725where
726    T: PoolTransaction + 'static,
727    TxCompat: TransactionCompat<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
728{
729    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
730        Self::drain(self).await
731    }
732}
733
734/// Represents the kind of pending transaction data that can be retrieved.
735///
736/// This enum differentiates between two kinds of pending transaction data:
737/// - Just the transaction hashes.
738/// - Full transaction details.
739#[derive(Debug, Clone)]
740enum PendingTransactionKind<T> {
741    Hashes(PendingTransactionsReceiver),
742    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
743}
744
745impl<T: 'static> PendingTransactionKind<T> {
746    async fn drain(&self) -> FilterChanges<T> {
747        match self {
748            Self::Hashes(receiver) => receiver.drain().await,
749            Self::FullTransaction(receiver) => receiver.drain().await,
750        }
751    }
752}
753
754#[derive(Clone, Debug)]
755enum FilterKind<T> {
756    Log(Box<Filter>),
757    Block,
758    PendingTransaction(PendingTransactionKind<T>),
759}
760
761/// An iterator that yields _inclusive_ block ranges of a given step size
762#[derive(Debug)]
763struct BlockRangeInclusiveIter {
764    iter: StepBy<RangeInclusive<u64>>,
765    step: u64,
766    end: u64,
767}
768
769impl BlockRangeInclusiveIter {
770    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
771        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
772    }
773}
774
775impl Iterator for BlockRangeInclusiveIter {
776    type Item = (u64, u64);
777
778    fn next(&mut self) -> Option<Self::Item> {
779        let start = self.iter.next()?;
780        let end = (start + self.step).min(self.end);
781        if start > end {
782            return None
783        }
784        Some((start, end))
785    }
786}
787
788/// Errors that can occur in the handler implementation
789#[derive(Debug, thiserror::Error)]
790pub enum EthFilterError {
791    /// Filter not found.
792    #[error("filter not found")]
793    FilterNotFound(FilterId),
794    /// Invalid block range.
795    #[error("invalid block range params")]
796    InvalidBlockRangeParams,
797    /// Query scope is too broad.
798    #[error("query exceeds max block range {0}")]
799    QueryExceedsMaxBlocks(u64),
800    /// Query result is too large.
801    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
802    QueryExceedsMaxResults {
803        /// Maximum number of logs allowed per response
804        max_logs: usize,
805        /// Start block of the suggested retry range
806        from_block: u64,
807        /// End block of the suggested retry range (last successfully processed block)
808        to_block: u64,
809    },
810    /// Error serving request in `eth_` namespace.
811    #[error(transparent)]
812    EthAPIError(#[from] EthApiError),
813    /// Error thrown when a spawned task failed to deliver a response.
814    #[error("internal filter error")]
815    InternalError,
816}
817
818impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
819    fn from(err: EthFilterError) -> Self {
820        match err {
821            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
822                jsonrpsee::types::error::INVALID_PARAMS_CODE,
823                "filter not found",
824            ),
825            err @ EthFilterError::InternalError => {
826                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
827            }
828            EthFilterError::EthAPIError(err) => err.into(),
829            err @ (EthFilterError::InvalidBlockRangeParams |
830            EthFilterError::QueryExceedsMaxBlocks(_) |
831            EthFilterError::QueryExceedsMaxResults { .. }) => {
832                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
833            }
834        }
835    }
836}
837
838impl From<ProviderError> for EthFilterError {
839    fn from(err: ProviderError) -> Self {
840        Self::EthAPIError(err.into())
841    }
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847    use rand::Rng;
848    use reth_testing_utils::generators;
849
850    #[test]
851    fn test_block_range_iter() {
852        let mut rng = generators::rng();
853
854        let start = rng.random::<u32>() as u64;
855        let end = start.saturating_add(rng.random::<u32>() as u64);
856        let step = rng.random::<u16>() as u64;
857        let range = start..=end;
858        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
859        let (from, mut end) = iter.next().unwrap();
860        assert_eq!(from, start);
861        assert_eq!(end, (from + step).min(*range.end()));
862
863        for (next_from, next_end) in iter {
864            // ensure range starts with previous end + 1
865            assert_eq!(next_from, end + 1);
866            end = next_end;
867        }
868
869        assert_eq!(end, *range.end());
870    }
871}