reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::future::TryFutureExt;
11use jsonrpsee::{core::RpcResult, server::IdProvider};
12use reth_errors::ProviderError;
13use reth_rpc_eth_api::{
14    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcNodeCoreExt,
15    RpcTransaction, TransactionCompat,
16};
17use reth_rpc_eth_types::{
18    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
19    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
20};
21use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
22use reth_storage_api::{
23    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
24    ProviderReceipt,
25};
26use reth_tasks::TaskSpawner;
27use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
28use std::{
29    collections::HashMap,
30    fmt,
31    future::Future,
32    iter::StepBy,
33    ops::RangeInclusive,
34    sync::Arc,
35    time::{Duration, Instant},
36};
37use tokio::{
38    sync::{mpsc::Receiver, Mutex},
39    time::MissedTickBehavior,
40};
41use tracing::{error, trace};
42
43impl<Eth> EngineEthFilter for EthFilter<Eth>
44where
45    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
46{
47    /// Returns logs matching given filter object, no query limits
48    fn logs(
49        &self,
50        filter: Filter,
51        limits: QueryLimits,
52    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
53        trace!(target: "rpc::eth", "Serving eth_getLogs");
54        self.inner.logs_for_filter(filter, limits).map_err(|e| e.into())
55    }
56}
57
58/// The maximum number of headers we read at once when handling a range filter.
59const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
60
61/// `Eth` filter RPC implementation.
62///
63/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
64pub struct EthFilter<Eth: EthApiTypes> {
65    /// All nested fields bundled together
66    inner: Arc<EthFilterInner<Eth>>,
67}
68
69impl<Eth> Clone for EthFilter<Eth>
70where
71    Eth: EthApiTypes,
72{
73    fn clone(&self) -> Self {
74        Self { inner: self.inner.clone() }
75    }
76}
77
78impl<Eth> EthFilter<Eth>
79where
80    Eth: EthApiTypes + 'static,
81{
82    /// Creates a new, shareable instance.
83    ///
84    /// This uses the given pool to get notified about new transactions, the provider to interact
85    /// with the blockchain, the cache to fetch cacheable data, like the logs.
86    ///
87    /// See also [`EthFilterConfig`].
88    ///
89    /// This also spawns a task that periodically clears stale filters.
90    ///
91    /// # Create a new instance with [`EthApi`](crate::EthApi)
92    ///
93    /// ```no_run
94    /// use reth_evm_ethereum::EthEvmConfig;
95    /// use reth_network_api::noop::NoopNetwork;
96    /// use reth_provider::noop::NoopProvider;
97    /// use reth_rpc::{EthApi, EthFilter};
98    /// use reth_tasks::TokioTaskExecutor;
99    /// use reth_transaction_pool::noop::NoopTransactionPool;
100    /// let eth_api = EthApi::builder(
101    ///     NoopProvider::default(),
102    ///     NoopTransactionPool::default(),
103    ///     NoopNetwork::default(),
104    ///     EthEvmConfig::mainnet(),
105    /// )
106    /// .build();
107    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
108    /// ```
109    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
110        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
111            config;
112        let inner = EthFilterInner {
113            eth_api,
114            active_filters: ActiveFilters::new(),
115            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
116            max_headers_range: MAX_HEADERS_RANGE,
117            task_spawner,
118            stale_filter_ttl,
119            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
120        };
121
122        let eth_filter = Self { inner: Arc::new(inner) };
123
124        let this = eth_filter.clone();
125        eth_filter.inner.task_spawner.spawn_critical(
126            "eth-filters_stale-filters-clean",
127            Box::pin(async move {
128                this.watch_and_clear_stale_filters().await;
129            }),
130        );
131
132        eth_filter
133    }
134
135    /// Returns all currently active filters
136    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
137        &self.inner.active_filters
138    }
139
140    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
141    /// Nonetheless, this endless future frees the thread at every await point.
142    async fn watch_and_clear_stale_filters(&self) {
143        let mut interval = tokio::time::interval_at(
144            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
145            self.inner.stale_filter_ttl,
146        );
147        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
148        loop {
149            interval.tick().await;
150            self.clear_stale_filters(Instant::now()).await;
151        }
152    }
153
154    /// Clears all filters that have not been polled for longer than the configured
155    /// `stale_filter_ttl` at the given instant.
156    pub async fn clear_stale_filters(&self, now: Instant) {
157        trace!(target: "rpc::eth", "clear stale filters");
158        self.active_filters().inner.lock().await.retain(|id, filter| {
159            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
160
161            if !is_valid {
162                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
163            }
164
165            is_valid
166        })
167    }
168}
169
170impl<Eth> EthFilter<Eth>
171where
172    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
173{
174    /// Access the underlying provider.
175    fn provider(&self) -> &Eth::Provider {
176        self.inner.eth_api.provider()
177    }
178
179    /// Access the underlying pool.
180    fn pool(&self) -> &Eth::Pool {
181        self.inner.eth_api.pool()
182    }
183
184    /// Returns all the filter changes for the given id, if any
185    pub async fn filter_changes(
186        &self,
187        id: FilterId,
188    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
189        let info = self.provider().chain_info()?;
190        let best_number = info.best_number;
191
192        // start_block is the block from which we should start fetching changes, the next block from
193        // the last time changes were polled, in other words the best block at last poll + 1
194        let (start_block, kind) = {
195            let mut filters = self.inner.active_filters.inner.lock().await;
196            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
197
198            if filter.block > best_number {
199                // no new blocks since the last poll
200                return Ok(FilterChanges::Empty)
201            }
202
203            // update filter
204            // we fetch all changes from [filter.block..best_block], so we advance the filter's
205            // block to `best_block +1`, the next from which we should start fetching changes again
206            let mut block = best_number + 1;
207            std::mem::swap(&mut filter.block, &mut block);
208            filter.last_poll_timestamp = Instant::now();
209
210            (block, filter.kind.clone())
211        };
212
213        match kind {
214            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
215            FilterKind::Block => {
216                // Note: we need to fetch the block hashes from inclusive range
217                // [start_block..best_block]
218                let end_block = best_number + 1;
219                let block_hashes =
220                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
221                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
222                    )?;
223                Ok(FilterChanges::Hashes(block_hashes))
224            }
225            FilterKind::Log(filter) => {
226                let (from_block_number, to_block_number) = match filter.block_option {
227                    FilterBlockOption::Range { from_block, to_block } => {
228                        let from = from_block
229                            .map(|num| self.provider().convert_block_number(num))
230                            .transpose()?
231                            .flatten();
232                        let to = to_block
233                            .map(|num| self.provider().convert_block_number(num))
234                            .transpose()?
235                            .flatten();
236                        logs_utils::get_filter_block_range(from, to, start_block, info)
237                    }
238                    FilterBlockOption::AtBlockHash(_) => {
239                        // blockHash is equivalent to fromBlock = toBlock = the block number with
240                        // hash blockHash
241                        // get_logs_in_block_range is inclusive
242                        (start_block, best_number)
243                    }
244                };
245                let logs = self
246                    .inner
247                    .get_logs_in_block_range(
248                        &filter,
249                        from_block_number,
250                        to_block_number,
251                        self.inner.query_limits,
252                    )
253                    .await?;
254                Ok(FilterChanges::Logs(logs))
255            }
256        }
257    }
258
259    /// Returns an array of all logs matching filter with given id.
260    ///
261    /// Returns an error if no matching log filter exists.
262    ///
263    /// Handler for `eth_getFilterLogs`
264    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
265        let filter = {
266            let filters = self.inner.active_filters.inner.lock().await;
267            if let FilterKind::Log(ref filter) =
268                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
269            {
270                *filter.clone()
271            } else {
272                // Not a log filter
273                return Err(EthFilterError::FilterNotFound(id))
274            }
275        };
276
277        self.inner.logs_for_filter(filter, self.inner.query_limits).await
278    }
279}
280
281#[async_trait]
282impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
283where
284    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
285{
286    /// Handler for `eth_newFilter`
287    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
288        trace!(target: "rpc::eth", "Serving eth_newFilter");
289        self.inner
290            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
291            .await
292    }
293
294    /// Handler for `eth_newBlockFilter`
295    async fn new_block_filter(&self) -> RpcResult<FilterId> {
296        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
297        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
298    }
299
300    /// Handler for `eth_newPendingTransactionFilter`
301    async fn new_pending_transaction_filter(
302        &self,
303        kind: Option<PendingTransactionFilterKind>,
304    ) -> RpcResult<FilterId> {
305        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
306
307        let transaction_kind = match kind.unwrap_or_default() {
308            PendingTransactionFilterKind::Hashes => {
309                let receiver = self.pool().pending_transactions_listener();
310                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
311                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
312            }
313            PendingTransactionFilterKind::Full => {
314                let stream = self.pool().new_pending_pool_transactions_listener();
315                let full_txs_receiver = FullTransactionsReceiver::new(
316                    stream,
317                    self.inner.eth_api.tx_resp_builder().clone(),
318                );
319                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
320                    full_txs_receiver,
321                )))
322            }
323        };
324
325        //let filter = FilterKind::PendingTransaction(transaction_kind);
326
327        // Install the filter and propagate any errors
328        self.inner.install_filter(transaction_kind).await
329    }
330
331    /// Handler for `eth_getFilterChanges`
332    async fn filter_changes(
333        &self,
334        id: FilterId,
335    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
336        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
337        Ok(Self::filter_changes(self, id).await?)
338    }
339
340    /// Returns an array of all logs matching filter with given id.
341    ///
342    /// Returns an error if no matching log filter exists.
343    ///
344    /// Handler for `eth_getFilterLogs`
345    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
346        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
347        Ok(Self::filter_logs(self, id).await?)
348    }
349
350    /// Handler for `eth_uninstallFilter`
351    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
352        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
353        let mut filters = self.inner.active_filters.inner.lock().await;
354        if filters.remove(&id).is_some() {
355            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
356            Ok(true)
357        } else {
358            Ok(false)
359        }
360    }
361
362    /// Returns logs matching given filter object.
363    ///
364    /// Handler for `eth_getLogs`
365    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
366        trace!(target: "rpc::eth", "Serving eth_getLogs");
367        Ok(self.inner.logs_for_filter(filter, self.inner.query_limits).await?)
368    }
369}
370
371impl<Eth> std::fmt::Debug for EthFilter<Eth>
372where
373    Eth: EthApiTypes,
374{
375    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376        f.debug_struct("EthFilter").finish_non_exhaustive()
377    }
378}
379
380/// Container type `EthFilter`
381#[derive(Debug)]
382struct EthFilterInner<Eth: EthApiTypes> {
383    /// Inner `eth` API implementation.
384    eth_api: Eth,
385    /// All currently installed filters.
386    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
387    /// Provides ids to identify filters
388    id_provider: Arc<dyn IdProvider>,
389    /// limits for logs queries
390    query_limits: QueryLimits,
391    /// maximum number of headers to read at once for range filter
392    max_headers_range: u64,
393    /// The type that can spawn tasks.
394    task_spawner: Box<dyn TaskSpawner>,
395    /// Duration since the last filter poll, after which the filter is considered stale
396    stale_filter_ttl: Duration,
397}
398
399impl<Eth> EthFilterInner<Eth>
400where
401    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
402{
403    /// Access the underlying provider.
404    fn provider(&self) -> &Eth::Provider {
405        self.eth_api.provider()
406    }
407
408    /// Access the underlying [`EthStateCache`].
409    fn eth_cache(
410        &self,
411    ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
412        self.eth_api.cache()
413    }
414
415    /// Returns logs matching given filter object.
416    async fn logs_for_filter(
417        &self,
418        filter: Filter,
419        limits: QueryLimits,
420    ) -> Result<Vec<Log>, EthFilterError> {
421        match filter.block_option {
422            FilterBlockOption::AtBlockHash(block_hash) => {
423                // for all matching logs in the block
424                // get the block header with the hash
425                let header = self
426                    .provider()
427                    .header_by_hash_or_number(block_hash.into())?
428                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
429
430                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
431
432                // we also need to ensure that the receipts are available and return an error if
433                // not, in case the block hash been reorged
434                let (receipts, maybe_block) = self
435                    .eth_cache()
436                    .get_receipts_and_maybe_block(block_num_hash.hash)
437                    .await?
438                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
439
440                let mut all_logs = Vec::new();
441                append_matching_block_logs(
442                    &mut all_logs,
443                    maybe_block
444                        .map(ProviderOrBlock::Block)
445                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
446                    &FilteredParams::new(Some(filter)),
447                    block_num_hash,
448                    &receipts,
449                    false,
450                    header.timestamp(),
451                )?;
452
453                Ok(all_logs)
454            }
455            FilterBlockOption::Range { from_block, to_block } => {
456                // compute the range
457                let info = self.provider().chain_info()?;
458
459                // we start at the most recent block if unset in filter
460                let start_block = info.best_number;
461                let from = from_block
462                    .map(|num| self.provider().convert_block_number(num))
463                    .transpose()?
464                    .flatten();
465                let to = to_block
466                    .map(|num| self.provider().convert_block_number(num))
467                    .transpose()?
468                    .flatten();
469                let (from_block_number, to_block_number) =
470                    logs_utils::get_filter_block_range(from, to, start_block, info);
471                self.get_logs_in_block_range(&filter, from_block_number, to_block_number, limits)
472                    .await
473            }
474        }
475    }
476
477    /// Installs a new filter and returns the new identifier.
478    async fn install_filter(
479        &self,
480        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
481    ) -> RpcResult<FilterId> {
482        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
483        let id = FilterId::from(self.id_provider.next_id());
484        let mut filters = self.active_filters.inner.lock().await;
485        filters.insert(
486            id.clone(),
487            ActiveFilter {
488                block: last_poll_block_number,
489                last_poll_timestamp: Instant::now(),
490                kind,
491            },
492        );
493        Ok(id)
494    }
495
496    /// Returns all logs in the given _inclusive_ range that match the filter
497    ///
498    /// Returns an error if:
499    ///  - underlying database error
500    ///  - amount of matches exceeds configured limit
501    async fn get_logs_in_block_range(
502        &self,
503        filter: &Filter,
504        from_block: u64,
505        to_block: u64,
506        limits: QueryLimits,
507    ) -> Result<Vec<Log>, EthFilterError> {
508        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
509
510        if to_block < from_block {
511            return Err(EthFilterError::InvalidBlockRangeParams)
512        }
513
514        if let Some(max_blocks_per_filter) =
515            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
516        {
517            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
518        }
519
520        let mut all_logs = Vec::new();
521        let filter_params = FilteredParams::new(Some(filter.clone()));
522
523        // derive bloom filters from filter input, so we can check headers for matching logs
524        let address_filter = FilteredParams::address_filter(&filter.address);
525        let topics_filter = FilteredParams::topics_filter(&filter.topics);
526
527        // loop over the range of new blocks and check logs if the filter matches the log's bloom
528        // filter
529        for (from, to) in
530            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
531        {
532            let headers = self.provider().headers_range(from..=to)?;
533
534            for (idx, header) in headers.iter().enumerate() {
535                // only if filter matches
536                if FilteredParams::matches_address(header.logs_bloom(), &address_filter) &&
537                    FilteredParams::matches_topics(header.logs_bloom(), &topics_filter)
538                {
539                    // these are consecutive headers, so we can use the parent hash of the next
540                    // block to get the current header's hash
541                    let block_hash = match headers.get(idx + 1) {
542                        Some(parent) => parent.parent_hash(),
543                        None => self
544                            .provider()
545                            .block_hash(header.number())?
546                            .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
547                    };
548
549                    let num_hash = BlockNumHash::new(header.number(), block_hash);
550                    if let Some((receipts, maybe_block)) =
551                        self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
552                    {
553                        append_matching_block_logs(
554                            &mut all_logs,
555                            maybe_block
556                                .map(ProviderOrBlock::Block)
557                                .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
558                            &filter_params,
559                            num_hash,
560                            &receipts,
561                            false,
562                            header.timestamp(),
563                        )?;
564
565                        // size check but only if range is multiple blocks, so we always return all
566                        // logs of a single block
567                        let is_multi_block_range = from_block != to_block;
568                        if let Some(max_logs_per_response) = limits.max_logs_per_response {
569                            if is_multi_block_range && all_logs.len() > max_logs_per_response {
570                                return Err(EthFilterError::QueryExceedsMaxResults {
571                                    max_logs: max_logs_per_response,
572                                    from_block,
573                                    to_block: num_hash.number.saturating_sub(1),
574                                });
575                            }
576                        }
577                    }
578                }
579            }
580        }
581
582        Ok(all_logs)
583    }
584}
585
586/// All active filters
587#[derive(Debug, Clone, Default)]
588pub struct ActiveFilters<T> {
589    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
590}
591
592impl<T> ActiveFilters<T> {
593    /// Returns an empty instance.
594    pub fn new() -> Self {
595        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
596    }
597}
598
599/// An installed filter
600#[derive(Debug)]
601struct ActiveFilter<T> {
602    /// At which block the filter was polled last.
603    block: u64,
604    /// Last time this filter was polled.
605    last_poll_timestamp: Instant,
606    /// What kind of filter it is.
607    kind: FilterKind<T>,
608}
609
610/// A receiver for pending transactions that returns all new transactions since the last poll.
611#[derive(Debug, Clone)]
612struct PendingTransactionsReceiver {
613    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
614}
615
616impl PendingTransactionsReceiver {
617    fn new(receiver: Receiver<TxHash>) -> Self {
618        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
619    }
620
621    /// Returns all new pending transactions received since the last poll.
622    async fn drain<T>(&self) -> FilterChanges<T> {
623        let mut pending_txs = Vec::new();
624        let mut prepared_stream = self.txs_receiver.lock().await;
625
626        while let Ok(tx_hash) = prepared_stream.try_recv() {
627            pending_txs.push(tx_hash);
628        }
629
630        // Convert the vector of hashes into FilterChanges::Hashes
631        FilterChanges::Hashes(pending_txs)
632    }
633}
634
635/// A structure to manage and provide access to a stream of full transaction details.
636#[derive(Debug, Clone)]
637struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
638    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
639    tx_resp_builder: TxCompat,
640}
641
642impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
643where
644    T: PoolTransaction + 'static,
645    TxCompat: TransactionCompat<T::Consensus>,
646{
647    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
648    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
649        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
650    }
651
652    /// Returns all new pending transactions received since the last poll.
653    async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
654        let mut pending_txs = Vec::new();
655        let mut prepared_stream = self.txs_stream.lock().await;
656
657        while let Ok(tx) = prepared_stream.try_recv() {
658            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
659                Ok(tx) => pending_txs.push(tx),
660                Err(err) => {
661                    error!(target: "rpc",
662                        %err,
663                        "Failed to fill txn with block context"
664                    );
665                }
666            }
667        }
668        FilterChanges::Transactions(pending_txs)
669    }
670}
671
672/// Helper trait for [FullTransactionsReceiver] to erase the `Transaction` type.
673#[async_trait]
674trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
675    async fn drain(&self) -> FilterChanges<T>;
676}
677
678#[async_trait]
679impl<T, TxCompat> FullTransactionsFilter<TxCompat::Transaction>
680    for FullTransactionsReceiver<T, TxCompat>
681where
682    T: PoolTransaction + 'static,
683    TxCompat: TransactionCompat<T::Consensus> + 'static,
684{
685    async fn drain(&self) -> FilterChanges<TxCompat::Transaction> {
686        Self::drain(self).await
687    }
688}
689
690/// Represents the kind of pending transaction data that can be retrieved.
691///
692/// This enum differentiates between two kinds of pending transaction data:
693/// - Just the transaction hashes.
694/// - Full transaction details.
695#[derive(Debug, Clone)]
696enum PendingTransactionKind<T> {
697    Hashes(PendingTransactionsReceiver),
698    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
699}
700
701impl<T: 'static> PendingTransactionKind<T> {
702    async fn drain(&self) -> FilterChanges<T> {
703        match self {
704            Self::Hashes(receiver) => receiver.drain().await,
705            Self::FullTransaction(receiver) => receiver.drain().await,
706        }
707    }
708}
709
710#[derive(Clone, Debug)]
711enum FilterKind<T> {
712    Log(Box<Filter>),
713    Block,
714    PendingTransaction(PendingTransactionKind<T>),
715}
716
717/// An iterator that yields _inclusive_ block ranges of a given step size
718#[derive(Debug)]
719struct BlockRangeInclusiveIter {
720    iter: StepBy<RangeInclusive<u64>>,
721    step: u64,
722    end: u64,
723}
724
725impl BlockRangeInclusiveIter {
726    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
727        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
728    }
729}
730
731impl Iterator for BlockRangeInclusiveIter {
732    type Item = (u64, u64);
733
734    fn next(&mut self) -> Option<Self::Item> {
735        let start = self.iter.next()?;
736        let end = (start + self.step).min(self.end);
737        if start > end {
738            return None
739        }
740        Some((start, end))
741    }
742}
743
744/// Errors that can occur in the handler implementation
745#[derive(Debug, thiserror::Error)]
746pub enum EthFilterError {
747    /// Filter not found.
748    #[error("filter not found")]
749    FilterNotFound(FilterId),
750    /// Invalid block range.
751    #[error("invalid block range params")]
752    InvalidBlockRangeParams,
753    /// Query scope is too broad.
754    #[error("query exceeds max block range {0}")]
755    QueryExceedsMaxBlocks(u64),
756    /// Query result is too large.
757    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
758    QueryExceedsMaxResults {
759        /// Maximum number of logs allowed per response
760        max_logs: usize,
761        /// Start block of the suggested retry range
762        from_block: u64,
763        /// End block of the suggested retry range (last successfully processed block)
764        to_block: u64,
765    },
766    /// Error serving request in `eth_` namespace.
767    #[error(transparent)]
768    EthAPIError(#[from] EthApiError),
769    /// Error thrown when a spawned task failed to deliver a response.
770    #[error("internal filter error")]
771    InternalError,
772}
773
774impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
775    fn from(err: EthFilterError) -> Self {
776        match err {
777            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
778                jsonrpsee::types::error::INVALID_PARAMS_CODE,
779                "filter not found",
780            ),
781            err @ EthFilterError::InternalError => {
782                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
783            }
784            EthFilterError::EthAPIError(err) => err.into(),
785            err @ (EthFilterError::InvalidBlockRangeParams |
786            EthFilterError::QueryExceedsMaxBlocks(_) |
787            EthFilterError::QueryExceedsMaxResults { .. }) => {
788                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
789            }
790        }
791    }
792}
793
794impl From<ProviderError> for EthFilterError {
795    fn from(err: ProviderError) -> Self {
796        Self::EthAPIError(err.into())
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803    use rand::Rng;
804    use reth_testing_utils::generators;
805
806    #[test]
807    fn test_block_range_iter() {
808        let mut rng = generators::rng();
809
810        let start = rng.random::<u32>() as u64;
811        let end = start.saturating_add(rng.random::<u32>() as u64);
812        let step = rng.random::<u16>() as u64;
813        let range = start..=end;
814        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
815        let (from, mut end) = iter.next().unwrap();
816        assert_eq!(from, start);
817        assert_eq!(end, (from + step).min(*range.end()));
818
819        for (next_from, next_end) in iter {
820            // ensure range starts with previous end + 1
821            assert_eq!(next_from, end + 1);
822            end = next_end;
823        }
824
825        assert_eq!(end, *range.end());
826    }
827}