Skip to main content

reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8    PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12    future::TryFutureExt,
13    stream::{FuturesOrdered, StreamExt},
14    Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21    helpers::{EthBlocks, LoadReceipt},
22    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23    RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32    ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::Runtime;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37    collections::{HashMap, VecDeque},
38    fmt,
39    iter::{Peekable, StepBy},
40    ops::RangeInclusive,
41    pin::Pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    sync::{mpsc::Receiver, oneshot, Mutex},
47    time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53    Eth: FullEthApiTypes
54        + RpcNodeCoreExt<Provider: BlockIdReader>
55        + LoadReceipt
56        + EthBlocks
57        + 'static,
58{
59    /// Returns logs matching given filter object, no query limits
60    fn logs(
61        &self,
62        filter: Filter,
63        limits: QueryLimits,
64    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65        trace!(target: "rpc::eth", "Serving eth_getLogs");
66        self.logs_for_filter(filter, limits).map_err(|e| e.into())
67    }
68}
69
70/// Threshold for deciding between cached and range mode processing
71const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73/// Threshold for bloom filter matches that triggers reduced caching
74const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76/// Threshold for bloom filter matches that triggers moderately reduced caching
77const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79/// Minimum block count to apply bloom filter match adjustments
80const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82/// The maximum number of headers we read at once when handling a range filter.
83const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
84
85/// Threshold for enabling parallel processing in range mode
86const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88/// Default concurrency for parallel processing
89const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91/// `Eth` filter RPC implementation.
92///
93/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
94pub struct EthFilter<Eth: EthApiTypes> {
95    /// All nested fields bundled together
96    inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101    Eth: EthApiTypes,
102{
103    fn clone(&self) -> Self {
104        Self { inner: self.inner.clone() }
105    }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110    Eth: EthApiTypes + 'static,
111{
112    /// Creates a new, shareable instance.
113    ///
114    /// This uses the given pool to get notified about new transactions, the provider to interact
115    /// with the blockchain, the cache to fetch cacheable data, like the logs.
116    ///
117    /// See also [`EthFilterConfig`].
118    ///
119    /// This also spawns a task that periodically clears stale filters.
120    ///
121    /// # Create a new instance with [`EthApi`](crate::EthApi)
122    ///
123    /// ```no_run
124    /// use reth_evm_ethereum::EthEvmConfig;
125    /// use reth_network_api::noop::NoopNetwork;
126    /// use reth_provider::noop::NoopProvider;
127    /// use reth_rpc::{EthApi, EthFilter};
128    /// use reth_tasks::Runtime;
129    /// use reth_transaction_pool::noop::NoopTransactionPool;
130    /// let eth_api = EthApi::builder(
131    ///     NoopProvider::default(),
132    ///     NoopTransactionPool::default(),
133    ///     NoopNetwork::default(),
134    ///     EthEvmConfig::mainnet(),
135    /// )
136    /// .build();
137    /// let filter = EthFilter::new(eth_api, Default::default(), Runtime::test());
138    /// ```
139    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self {
140        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141            config;
142        let inner = EthFilterInner {
143            eth_api,
144            active_filters: ActiveFilters::new(),
145            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146            max_headers_range: MAX_HEADERS_RANGE,
147            task_spawner,
148            stale_filter_ttl,
149            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150        };
151
152        let eth_filter = Self { inner: Arc::new(inner) };
153
154        let this = eth_filter.clone();
155        eth_filter.inner.task_spawner.spawn_critical_task(
156            "eth-filters_stale-filters-clean",
157            async move {
158                this.watch_and_clear_stale_filters().await;
159            },
160        );
161
162        eth_filter
163    }
164
165    /// Returns all currently active filters
166    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167        &self.inner.active_filters
168    }
169
170    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
171    /// Nonetheless, this endless future frees the thread at every await point.
172    async fn watch_and_clear_stale_filters(&self) {
173        let mut interval = tokio::time::interval_at(
174            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175            self.inner.stale_filter_ttl,
176        );
177        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178        loop {
179            interval.tick().await;
180            self.clear_stale_filters(Instant::now()).await;
181        }
182    }
183
184    /// Clears all filters that have not been polled for longer than the configured
185    /// `stale_filter_ttl` at the given instant.
186    pub async fn clear_stale_filters(&self, now: Instant) {
187        trace!(target: "rpc::eth", "clear stale filters");
188        let mut filters = self.active_filters().inner.lock().await;
189        filters.retain(|id, filter| {
190            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192            if !is_valid {
193                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194            }
195
196            is_valid
197        });
198        filters.shrink_to_fit();
199    }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205        + RpcNodeCoreExt
206        + LoadReceipt
207        + EthBlocks
208        + 'static,
209{
210    /// Access the underlying provider.
211    fn provider(&self) -> &Eth::Provider {
212        self.inner.eth_api.provider()
213    }
214
215    /// Access the underlying pool.
216    fn pool(&self) -> &Eth::Pool {
217        self.inner.eth_api.pool()
218    }
219
220    /// Returns all the filter changes for the given id, if any
221    pub async fn filter_changes(
222        &self,
223        id: FilterId,
224    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225        let info = self.provider().chain_info()?;
226        let best_number = info.best_number;
227
228        // start_block is the block from which we should start fetching changes, the next block from
229        // the last time changes were polled, in other words the best block at last poll + 1
230        let (start_block, kind) = {
231            let mut filters = self.inner.active_filters.inner.lock().await;
232            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234            if filter.block > best_number {
235                // no new blocks since the last poll
236                return Ok(FilterChanges::Empty)
237            }
238
239            // update filter
240            // we fetch all changes from [filter.block..best_block], so we advance the filter's
241            // block to `best_block +1`, the next from which we should start fetching changes again
242            let mut block = best_number + 1;
243            std::mem::swap(&mut filter.block, &mut block);
244            filter.last_poll_timestamp = Instant::now();
245
246            (block, filter.kind.clone())
247        };
248
249        match kind {
250            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251            FilterKind::Block => {
252                // Note: we need to fetch the block hashes from inclusive range
253                // [start_block..best_block]
254                let end_block = best_number + 1;
255                let block_hashes =
256                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
257                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258                    )?;
259                Ok(FilterChanges::Hashes(block_hashes))
260            }
261            FilterKind::Log(filter) => {
262                let (from_block_number, to_block_number) = match filter.block_option {
263                    FilterBlockOption::Range { from_block, to_block } => {
264                        let from = from_block
265                            .map(|num| self.provider().convert_block_number(num))
266                            .transpose()?
267                            .flatten();
268                        let to = to_block
269                            .map(|num| self.provider().convert_block_number(num))
270                            .transpose()?
271                            .flatten();
272                        logs_utils::get_filter_block_range(from, to, start_block, info)?
273                    }
274                    FilterBlockOption::AtBlockHash(block_hash) => {
275                        // blockHash is equivalent to fromBlock = toBlock = the block number with
276                        // hash blockHash
277                        // get_logs_in_block_range is inclusive
278                        let block_number = self
279                            .provider()
280                            .block_number(block_hash)?
281                            .ok_or(ProviderError::HeaderNotFound(block_hash.into()))?;
282                        (block_number, block_number)
283                    }
284                };
285                let logs = self
286                    .inner
287                    .clone()
288                    .get_logs_in_block_range(
289                        *filter,
290                        from_block_number,
291                        to_block_number,
292                        self.inner.query_limits,
293                    )
294                    .await?;
295                Ok(FilterChanges::Logs(logs))
296            }
297        }
298    }
299
300    /// Returns an array of all logs matching filter with given id.
301    ///
302    /// Returns an error if no matching log filter exists.
303    ///
304    /// Handler for `eth_getFilterLogs`
305    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
306        let filter = {
307            let mut filters = self.inner.active_filters.inner.lock().await;
308            let filter =
309                filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
310            if let FilterKind::Log(ref inner_filter) = filter.kind {
311                filter.last_poll_timestamp = Instant::now();
312                *inner_filter.clone()
313            } else {
314                // Not a log filter
315                return Err(EthFilterError::FilterNotFound(id))
316            }
317        };
318
319        self.logs_for_filter(filter, self.inner.query_limits).await
320    }
321
322    /// Returns logs matching given filter object.
323    async fn logs_for_filter(
324        &self,
325        filter: Filter,
326        limits: QueryLimits,
327    ) -> Result<Vec<Log>, EthFilterError> {
328        self.inner.clone().logs_for_filter(filter, limits).await
329    }
330}
331
332#[async_trait]
333impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
334where
335    Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
336{
337    /// Handler for `eth_newFilter`
338    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
339        trace!(target: "rpc::eth", "Serving eth_newFilter");
340        self.inner
341            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
342            .await
343    }
344
345    /// Handler for `eth_newBlockFilter`
346    async fn new_block_filter(&self) -> RpcResult<FilterId> {
347        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
348        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
349    }
350
351    /// Handler for `eth_newPendingTransactionFilter`
352    async fn new_pending_transaction_filter(
353        &self,
354        kind: Option<PendingTransactionFilterKind>,
355    ) -> RpcResult<FilterId> {
356        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
357
358        let transaction_kind = match kind.unwrap_or_default() {
359            PendingTransactionFilterKind::Hashes => {
360                let receiver = self.pool().pending_transactions_listener();
361                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
362                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
363            }
364            PendingTransactionFilterKind::Full => {
365                let stream = self.pool().new_pending_pool_transactions_listener();
366                let full_txs_receiver = FullTransactionsReceiver::new(
367                    stream,
368                    dyn_clone::clone(self.inner.eth_api.converter()),
369                );
370                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
371                    full_txs_receiver,
372                )))
373            }
374        };
375
376        // Install the filter and propagate any errors
377        self.inner.install_filter(transaction_kind).await
378    }
379
380    /// Handler for `eth_getFilterChanges`
381    async fn filter_changes(
382        &self,
383        id: FilterId,
384    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
385        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
386        Ok(Self::filter_changes(self, id).await?)
387    }
388
389    /// Returns an array of all logs matching filter with given id.
390    ///
391    /// Returns an error if no matching log filter exists.
392    ///
393    /// Handler for `eth_getFilterLogs`
394    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
395        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
396        Ok(Self::filter_logs(self, id).await?)
397    }
398
399    /// Handler for `eth_uninstallFilter`
400    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
401        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
402        let mut filters = self.inner.active_filters.inner.lock().await;
403        if filters.remove(&id).is_some() {
404            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
405            Ok(true)
406        } else {
407            Ok(false)
408        }
409    }
410
411    /// Returns logs matching given filter object.
412    ///
413    /// Handler for `eth_getLogs`
414    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
415        trace!(target: "rpc::eth", "Serving eth_getLogs");
416        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
417    }
418}
419
420impl<Eth> std::fmt::Debug for EthFilter<Eth>
421where
422    Eth: EthApiTypes,
423{
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        f.debug_struct("EthFilter").finish_non_exhaustive()
426    }
427}
428
429/// Container type `EthFilter`
430#[derive(Debug)]
431struct EthFilterInner<Eth: EthApiTypes> {
432    /// Inner `eth` API implementation.
433    eth_api: Eth,
434    /// All currently installed filters.
435    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
436    /// Provides ids to identify filters
437    id_provider: Arc<dyn IdProvider>,
438    /// limits for logs queries
439    query_limits: QueryLimits,
440    /// maximum number of headers to read at once for range filter
441    max_headers_range: u64,
442    /// The type that can spawn tasks.
443    task_spawner: Runtime,
444    /// Duration since the last filter poll, after which the filter is considered stale
445    stale_filter_ttl: Duration,
446}
447
448impl<Eth> EthFilterInner<Eth>
449where
450    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
451        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
452        + LoadReceipt
453        + EthBlocks
454        + 'static,
455{
456    /// Access the underlying provider.
457    fn provider(&self) -> &Eth::Provider {
458        self.eth_api.provider()
459    }
460
461    /// Access the underlying [`EthStateCache`].
462    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
463        self.eth_api.cache()
464    }
465
466    /// Returns logs matching given filter object.
467    async fn logs_for_filter(
468        self: Arc<Self>,
469        filter: Filter,
470        limits: QueryLimits,
471    ) -> Result<Vec<Log>, EthFilterError> {
472        match filter.block_option {
473            FilterBlockOption::AtBlockHash(block_hash) => {
474                // First try to get cached block and receipts, as it's likely they're already cached
475                let Some((receipts, maybe_block)) =
476                    self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
477                else {
478                    return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
479                };
480
481                // Read number and timestamp from cached block or provider header
482                let (block_number, block_timestamp) = if let Some(block) = &maybe_block {
483                    (block.header().number(), block.header().timestamp())
484                } else {
485                    let header = self
486                        .provider()
487                        .header_by_hash_or_number(block_hash.into())?
488                        .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
489                    (header.number(), header.timestamp())
490                };
491
492                // Check if the block has been pruned (EIP-4444)
493                let earliest_block = self.provider().earliest_block_number()?;
494                if block_number < earliest_block {
495                    return Err(EthApiError::PrunedHistoryUnavailable.into());
496                }
497
498                let block_num_hash = BlockNumHash::new(block_number, block_hash);
499
500                let mut all_logs = Vec::new();
501                append_matching_block_logs(
502                    &mut all_logs,
503                    maybe_block
504                        .map(ProviderOrBlock::Block)
505                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
506                    &filter,
507                    block_num_hash,
508                    &receipts,
509                    false,
510                    block_timestamp,
511                )?;
512
513                Ok(all_logs)
514            }
515            FilterBlockOption::Range { from_block, to_block } => {
516                // Handle special case where from block is pending
517                if from_block.is_some_and(|b| b.is_pending()) {
518                    let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
519                    if !(to_block.is_pending() || to_block.is_number()) {
520                        // always empty range
521                        return Ok(Vec::new());
522                    }
523                    // Try to get pending block and receipts
524                    if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
525                        if let BlockNumberOrTag::Number(to_block) = to_block &&
526                            to_block < pending_block.block.number()
527                        {
528                            // this block range is empty based on the user input
529                            return Ok(Vec::new());
530                        }
531
532                        let info = self.provider().chain_info()?;
533                        if pending_block.block.number() > info.best_number {
534                            // only consider the pending block if it is ahead of the chain
535                            let mut all_logs = Vec::new();
536                            let timestamp = pending_block.block.timestamp();
537                            let block_num_hash = pending_block.block.num_hash();
538                            append_matching_block_logs(
539                                &mut all_logs,
540                                ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
541                                &filter,
542                                block_num_hash,
543                                &pending_block.receipts,
544                                false, // removed = false for pending blocks
545                                timestamp,
546                            )?;
547                            return Ok(all_logs);
548                        }
549                    }
550                }
551
552                let info = self.provider().chain_info()?;
553                let start_block = info.best_number;
554                let from = from_block
555                    .map(|num| self.provider().convert_block_number(num))
556                    .transpose()?
557                    .flatten();
558                let to = to_block
559                    .map(|num| self.provider().convert_block_number(num))
560                    .transpose()?
561                    .flatten();
562
563                // Return error if toBlock exceeds current head
564                if let Some(t) = to &&
565                    t > info.best_number
566                {
567                    return Err(EthFilterError::BlockRangeExceedsHead);
568                }
569
570                if let Some(f) = from &&
571                    f > info.best_number
572                {
573                    // start block higher than local head, can return empty
574                    return Ok(Vec::new());
575                }
576
577                let (from_block_number, to_block_number) =
578                    logs_utils::get_filter_block_range(from, to, start_block, info)?;
579
580                // Check if the requested range overlaps with pruned history (EIP-4444)
581                let earliest_block = self.provider().earliest_block_number()?;
582                if from_block_number < earliest_block {
583                    return Err(EthApiError::PrunedHistoryUnavailable.into());
584                }
585
586                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
587                    .await
588            }
589        }
590    }
591
592    /// Installs a new filter and returns the new identifier.
593    async fn install_filter(
594        &self,
595        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
596    ) -> RpcResult<FilterId> {
597        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
598        let subscription_id = self.id_provider.next_id();
599
600        let id = match subscription_id {
601            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
602            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
603        };
604        let mut filters = self.active_filters.inner.lock().await;
605        filters.insert(
606            id.clone(),
607            ActiveFilter {
608                block: last_poll_block_number,
609                last_poll_timestamp: Instant::now(),
610                kind,
611            },
612        );
613        Ok(id)
614    }
615
616    /// Returns all logs in the given _inclusive_ range that match the filter
617    ///
618    /// Returns an error if:
619    ///  - underlying database error
620    ///  - amount of matches exceeds configured limit
621    async fn get_logs_in_block_range(
622        self: Arc<Self>,
623        filter: Filter,
624        from_block: u64,
625        to_block: u64,
626        limits: QueryLimits,
627    ) -> Result<Vec<Log>, EthFilterError> {
628        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
629
630        // perform boundary checks first
631        if to_block < from_block {
632            return Err(EthFilterError::InvalidBlockRangeParams)
633        }
634
635        if let Some(max_blocks_per_filter) =
636            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
637        {
638            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
639        }
640
641        let (tx, rx) = oneshot::channel();
642        let this = self.clone();
643        self.task_spawner.spawn_blocking_task(async move {
644            let res =
645                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
646            let _ = tx.send(res);
647        });
648
649        rx.await.map_err(|_| EthFilterError::InternalError)?
650    }
651
652    /// Returns all logs in the given _inclusive_ range that match the filter
653    ///
654    /// Note: This function uses a mix of blocking db operations for fetching indices and header
655    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
656    /// This function is considered blocking and should thus be spawned on a blocking task.
657    ///
658    /// Returns an error if:
659    ///  - underlying database error
660    async fn get_logs_in_block_range_inner(
661        self: Arc<Self>,
662        filter: &Filter,
663        from_block: u64,
664        to_block: u64,
665        limits: QueryLimits,
666    ) -> Result<Vec<Log>, EthFilterError> {
667        let mut all_logs = Vec::new();
668        let mut matching_headers = Vec::new();
669
670        // get current chain tip to determine processing mode
671        let chain_tip = self.provider().best_block_number()?;
672
673        // first collect all headers that match the bloom filter for cached mode decision
674        for (from, to) in
675            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
676        {
677            let headers = self.provider().headers_range(from..=to)?;
678
679            let mut headers_iter = headers.into_iter().peekable();
680
681            while let Some(header) = headers_iter.next() {
682                if !filter.matches_bloom(header.logs_bloom()) {
683                    continue
684                }
685
686                let current_number = header.number();
687
688                let block_hash = match headers_iter.peek() {
689                    Some(next_header) if next_header.number() == current_number + 1 => {
690                        // Headers are consecutive, use the more efficient parent_hash
691                        next_header.parent_hash()
692                    }
693                    _ => {
694                        // Headers not consecutive or last header, calculate hash
695                        header.hash_slow()
696                    }
697                };
698
699                matching_headers.push(SealedHeader::new(header, block_hash));
700            }
701        }
702
703        // initialize the appropriate range mode based on collected headers
704        let mut range_mode = RangeMode::new(
705            self.clone(),
706            matching_headers,
707            from_block,
708            to_block,
709            self.max_headers_range,
710            chain_tip,
711        );
712
713        // iterate through the range mode to get receipts and blocks
714        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
715            range_mode.next().await?
716        {
717            let num_hash = header.num_hash();
718            append_matching_block_logs(
719                &mut all_logs,
720                recovered_block
721                    .map(ProviderOrBlock::Block)
722                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
723                filter,
724                num_hash,
725                &receipts,
726                false,
727                header.timestamp(),
728            )?;
729
730            // size check but only if range is multiple blocks, so we always return all
731            // logs of a single block
732            let is_multi_block_range = from_block != to_block;
733            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
734                is_multi_block_range &&
735                all_logs.len() > max_logs_per_response
736            {
737                debug!(
738                    target: "rpc::eth::filter",
739                    logs_found = all_logs.len(),
740                    max_logs_per_response,
741                    from_block,
742                    to_block = num_hash.number,
743                    "Query exceeded max logs per response limit"
744                );
745                return Err(EthFilterError::QueryExceedsMaxResults {
746                    max_logs: max_logs_per_response,
747                    from_block,
748                    to_block: num_hash.number,
749                });
750            }
751        }
752
753        Ok(all_logs)
754    }
755}
756
757/// All active filters
758#[derive(Debug, Clone, Default)]
759pub struct ActiveFilters<T> {
760    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
761}
762
763impl<T> ActiveFilters<T> {
764    /// Returns an empty instance.
765    pub fn new() -> Self {
766        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
767    }
768
769    /// Returns `true` if a filter with the given id exists.
770    pub async fn contains(&self, id: &FilterId) -> bool {
771        self.inner.lock().await.contains_key(id)
772    }
773
774    /// Returns the number of currently active filters.
775    pub async fn len(&self) -> usize {
776        self.inner.lock().await.len()
777    }
778
779    /// Returns `true` if there are no active filters.
780    pub async fn is_empty(&self) -> bool {
781        self.inner.lock().await.is_empty()
782    }
783
784    /// Returns all active filter ids.
785    pub async fn ids(&self) -> Vec<FilterId> {
786        self.inner.lock().await.keys().cloned().collect()
787    }
788}
789
790/// An installed filter
791#[derive(Debug)]
792struct ActiveFilter<T> {
793    /// At which block the filter was polled last.
794    block: u64,
795    /// Last time this filter was polled.
796    last_poll_timestamp: Instant,
797    /// What kind of filter it is.
798    kind: FilterKind<T>,
799}
800
801/// A receiver for pending transactions that returns all new transactions since the last poll.
802#[derive(Debug, Clone)]
803struct PendingTransactionsReceiver {
804    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
805}
806
807impl PendingTransactionsReceiver {
808    fn new(receiver: Receiver<TxHash>) -> Self {
809        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
810    }
811
812    /// Returns all new pending transactions received since the last poll.
813    async fn drain<T>(&self) -> FilterChanges<T> {
814        let mut pending_txs = Vec::new();
815        let mut prepared_stream = self.txs_receiver.lock().await;
816
817        while let Ok(tx_hash) = prepared_stream.try_recv() {
818            pending_txs.push(tx_hash);
819        }
820
821        // Convert the vector of hashes into FilterChanges::Hashes
822        FilterChanges::Hashes(pending_txs)
823    }
824}
825
826/// A structure to manage and provide access to a stream of full transaction details.
827#[derive(Debug, Clone)]
828struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
829    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
830    converter: TxCompat,
831}
832
833impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
834where
835    T: PoolTransaction + 'static,
836    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
837{
838    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
839    fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
840        Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
841    }
842
843    /// Returns all new pending transactions received since the last poll.
844    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
845        let mut pending_txs = Vec::new();
846        let mut prepared_stream = self.txs_stream.lock().await;
847
848        while let Ok(tx) = prepared_stream.try_recv() {
849            match self.converter.fill_pending(tx.transaction.to_consensus()) {
850                Ok(tx) => pending_txs.push(tx),
851                Err(err) => {
852                    error!(target: "rpc",
853                        %err,
854                        "Failed to fill txn with block context"
855                    );
856                }
857            }
858        }
859        FilterChanges::Transactions(pending_txs)
860    }
861}
862
863/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
864#[async_trait]
865trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
866    async fn drain(&self) -> FilterChanges<T>;
867}
868
869#[async_trait]
870impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
871    for FullTransactionsReceiver<T, TxCompat>
872where
873    T: PoolTransaction + 'static,
874    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
875{
876    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
877        Self::drain(self).await
878    }
879}
880
881/// Represents the kind of pending transaction data that can be retrieved.
882///
883/// This enum differentiates between two kinds of pending transaction data:
884/// - Just the transaction hashes.
885/// - Full transaction details.
886#[derive(Debug, Clone)]
887enum PendingTransactionKind<T> {
888    Hashes(PendingTransactionsReceiver),
889    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
890}
891
892impl<T: 'static> PendingTransactionKind<T> {
893    async fn drain(&self) -> FilterChanges<T> {
894        match self {
895            Self::Hashes(receiver) => receiver.drain().await,
896            Self::FullTransaction(receiver) => receiver.drain().await,
897        }
898    }
899}
900
901#[derive(Clone, Debug)]
902enum FilterKind<T> {
903    Log(Box<Filter>),
904    Block,
905    PendingTransaction(PendingTransactionKind<T>),
906}
907
908/// An iterator that yields _inclusive_ block ranges of a given step size
909#[derive(Debug)]
910struct BlockRangeInclusiveIter {
911    iter: StepBy<RangeInclusive<u64>>,
912    step: u64,
913    end: u64,
914}
915
916impl BlockRangeInclusiveIter {
917    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
918        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
919    }
920}
921
922impl Iterator for BlockRangeInclusiveIter {
923    type Item = (u64, u64);
924
925    fn next(&mut self) -> Option<Self::Item> {
926        let start = self.iter.next()?;
927        let end = (start + self.step).min(self.end);
928        if start > end {
929            return None
930        }
931        Some((start, end))
932    }
933}
934
935/// Errors that can occur in the handler implementation
936#[derive(Debug, thiserror::Error)]
937pub enum EthFilterError {
938    /// Filter not found.
939    #[error("filter not found")]
940    FilterNotFound(FilterId),
941    /// Invalid block range.
942    #[error("invalid block range params")]
943    InvalidBlockRangeParams,
944    /// Block range extends beyond current head.
945    #[error("block range extends beyond current head block")]
946    BlockRangeExceedsHead,
947    /// Query scope is too broad.
948    #[error("query exceeds max block range {0}")]
949    QueryExceedsMaxBlocks(u64),
950    /// Query result is too large.
951    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
952    QueryExceedsMaxResults {
953        /// Maximum number of logs allowed per response
954        max_logs: usize,
955        /// Start block of the suggested retry range
956        from_block: u64,
957        /// End block of the suggested retry range (last successfully processed block)
958        to_block: u64,
959    },
960    /// Error serving request in `eth_` namespace.
961    #[error(transparent)]
962    EthAPIError(#[from] EthApiError),
963    /// Error thrown when a spawned task failed to deliver a response.
964    #[error("internal filter error")]
965    InternalError,
966}
967
968impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
969    fn from(err: EthFilterError) -> Self {
970        match err {
971            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
972                jsonrpsee::types::error::INVALID_PARAMS_CODE,
973                "filter not found",
974            ),
975            err @ EthFilterError::InternalError => {
976                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
977            }
978            EthFilterError::EthAPIError(err) => err.into(),
979            err @ (EthFilterError::InvalidBlockRangeParams |
980            EthFilterError::QueryExceedsMaxBlocks(_) |
981            EthFilterError::QueryExceedsMaxResults { .. } |
982            EthFilterError::BlockRangeExceedsHead) => {
983                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
984            }
985        }
986    }
987}
988
989impl From<ProviderError> for EthFilterError {
990    fn from(err: ProviderError) -> Self {
991        Self::EthAPIError(err.into())
992    }
993}
994
995impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
996    fn from(err: logs_utils::FilterBlockRangeError) -> Self {
997        match err {
998            logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
999            logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
1000        }
1001    }
1002}
1003
1004/// Helper type for the common pattern of returning receipts, block and the original header that is
1005/// a match for the filter.
1006struct ReceiptBlockResult<P>
1007where
1008    P: ReceiptProvider + BlockReader,
1009{
1010    /// We always need the entire receipts for the matching block.
1011    receipts: Arc<Vec<ProviderReceipt<P>>>,
1012    /// Block can be optional and we can fetch it lazily when needed.
1013    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
1014    /// The header of the block.
1015    header: SealedHeader<<P as HeaderProvider>::Header>,
1016}
1017
1018/// Represents different modes for processing block ranges when filtering logs
1019enum RangeMode<
1020    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1021        + EthApiTypes
1022        + LoadReceipt
1023        + EthBlocks
1024        + 'static,
1025> {
1026    /// Use cache-based processing for recent blocks
1027    Cached(CachedMode<Eth>),
1028    /// Use range-based processing for older blocks
1029    Range(RangeBlockMode<Eth>),
1030}
1031
1032impl<
1033        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1034            + EthApiTypes
1035            + LoadReceipt
1036            + EthBlocks
1037            + 'static,
1038    > RangeMode<Eth>
1039{
1040    /// Creates a new `RangeMode`.
1041    fn new(
1042        filter_inner: Arc<EthFilterInner<Eth>>,
1043        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1044        from_block: u64,
1045        to_block: u64,
1046        max_headers_range: u64,
1047        chain_tip: u64,
1048    ) -> Self {
1049        let block_count = to_block - from_block + 1;
1050        let distance_from_tip = chain_tip.saturating_sub(to_block);
1051
1052        // Determine if we should use cached mode based on range characteristics
1053        let use_cached_mode =
1054            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1055
1056        if use_cached_mode && !sealed_headers.is_empty() {
1057            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1058        } else {
1059            Self::Range(RangeBlockMode {
1060                filter_inner,
1061                iter: sealed_headers.into_iter().peekable(),
1062                next: VecDeque::new(),
1063                max_range: max_headers_range as usize,
1064                pending_tasks: FuturesOrdered::new(),
1065            })
1066        }
1067    }
1068
1069    /// Determines whether to use cached mode based on bloom filter matches and range size
1070    const fn should_use_cached_mode(
1071        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1072        block_count: u64,
1073        distance_from_tip: u64,
1074    ) -> bool {
1075        // Headers are already filtered by bloom, so count equals length
1076        let bloom_matches = headers.len();
1077
1078        // Calculate adjusted threshold based on bloom matches
1079        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1080
1081        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1082    }
1083
1084    /// Calculates the adjusted cache threshold based on bloom filter matches
1085    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1086        // Only apply adjustments for larger ranges
1087        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1088            return CACHED_MODE_BLOCK_THRESHOLD;
1089        }
1090
1091        match bloom_matches {
1092            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1093            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1094            _ => CACHED_MODE_BLOCK_THRESHOLD,
1095        }
1096    }
1097
1098    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
1099    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1100        match self {
1101            Self::Cached(cached) => cached.next().await,
1102            Self::Range(range) => range.next().await,
1103        }
1104    }
1105}
1106
1107/// Mode for processing blocks using cache optimization for recent blocks
1108struct CachedMode<
1109    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1110        + EthApiTypes
1111        + LoadReceipt
1112        + EthBlocks
1113        + 'static,
1114> {
1115    filter_inner: Arc<EthFilterInner<Eth>>,
1116    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1117}
1118
1119impl<
1120        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1121            + EthApiTypes
1122            + LoadReceipt
1123            + EthBlocks
1124            + 'static,
1125    > CachedMode<Eth>
1126{
1127    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1128        for header in self.headers_iter.by_ref() {
1129            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1130            if let Some((receipts, maybe_block)) =
1131                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1132            {
1133                return Ok(Some(ReceiptBlockResult {
1134                    receipts,
1135                    recovered_block: maybe_block,
1136                    header,
1137                }));
1138            }
1139        }
1140
1141        Ok(None) // No more headers
1142    }
1143}
1144
1145/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1146type ReceiptFetchFuture<P> =
1147    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1148
1149/// Mode for processing blocks using range queries for older blocks
1150struct RangeBlockMode<
1151    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1152        + EthApiTypes
1153        + LoadReceipt
1154        + EthBlocks
1155        + 'static,
1156> {
1157    filter_inner: Arc<EthFilterInner<Eth>>,
1158    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1159    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1160    max_range: usize,
1161    // Stream of ongoing receipt fetching tasks
1162    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1163}
1164
1165impl<
1166        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1167            + EthApiTypes
1168            + LoadReceipt
1169            + EthBlocks
1170            + 'static,
1171    > RangeBlockMode<Eth>
1172{
1173    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1174        loop {
1175            // First, try to return any already processed result from buffer
1176            if let Some(result) = self.next.pop_front() {
1177                return Ok(Some(result));
1178            }
1179
1180            // Try to get a completed task result if there are pending tasks
1181            if let Some(task_result) = self.pending_tasks.next().await {
1182                self.next.extend(task_result?);
1183                continue;
1184            }
1185
1186            // No pending tasks - try to generate more work
1187            let Some(next_header) = self.iter.next() else {
1188                // No more headers to process
1189                return Ok(None);
1190            };
1191
1192            let mut range_headers = Vec::with_capacity(self.max_range);
1193            range_headers.push(next_header);
1194
1195            // Collect consecutive blocks up to max_range size
1196            while range_headers.len() < self.max_range {
1197                let Some(peeked) = self.iter.peek() else { break };
1198                let Some(last_header) = range_headers.last() else { break };
1199
1200                let expected_next = last_header.number() + 1;
1201                if peeked.number() != expected_next {
1202                    trace!(
1203                        target: "rpc::eth::filter",
1204                        last_block = last_header.number(),
1205                        next_block = peeked.number(),
1206                        expected = expected_next,
1207                        range_size = range_headers.len(),
1208                        "Non-consecutive block detected, stopping range collection"
1209                    );
1210                    break; // Non-consecutive block, stop here
1211                }
1212
1213                let Some(next_header) = self.iter.next() else { break };
1214                range_headers.push(next_header);
1215            }
1216
1217            // Check if we should use parallel processing for large ranges
1218            let remaining_headers = self.iter.len() + range_headers.len();
1219            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1220                self.spawn_parallel_tasks(range_headers);
1221                // Continue loop to await the spawned tasks
1222            } else {
1223                // Process small range sequentially and add results to buffer
1224                if let Some(result) = self.process_small_range(range_headers).await? {
1225                    return Ok(Some(result));
1226                }
1227                // Continue loop to check for more work
1228            }
1229        }
1230    }
1231
1232    /// Process a small range of headers sequentially
1233    ///
1234    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1235    async fn process_small_range(
1236        &mut self,
1237        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1238    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1239        // Process each header individually to avoid queuing for all receipts
1240        for header in range_headers {
1241            // First check if already cached to avoid unnecessary provider calls
1242            let (maybe_block, maybe_receipts) = self
1243                .filter_inner
1244                .eth_cache()
1245                .maybe_cached_block_and_receipts(header.hash())
1246                .await?;
1247
1248            let receipts = match maybe_receipts {
1249                Some(receipts) => receipts,
1250                None => {
1251                    // Not cached - fetch directly from provider
1252                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1253                        Some(receipts) => Arc::new(receipts),
1254                        None => continue, // No receipts found
1255                    }
1256                }
1257            };
1258
1259            if !receipts.is_empty() {
1260                self.next.push_back(ReceiptBlockResult {
1261                    receipts,
1262                    recovered_block: maybe_block,
1263                    header,
1264                });
1265            }
1266        }
1267
1268        Ok(self.next.pop_front())
1269    }
1270
1271    /// Spawn parallel tasks for processing a large range of headers
1272    ///
1273    /// This is used when the remaining headers count is at or above
1274    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1275    fn spawn_parallel_tasks(
1276        &mut self,
1277        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1278    ) {
1279        // Split headers into chunks
1280        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1281        let header_chunks = range_headers
1282            .into_iter()
1283            .chunks(chunk_size)
1284            .into_iter()
1285            .map(|chunk| chunk.collect::<Vec<_>>())
1286            .collect::<Vec<_>>();
1287
1288        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1289        for chunk_headers in header_chunks {
1290            let filter_inner = self.filter_inner.clone();
1291            let chunk_task = Box::pin(async move {
1292                let chunk_task = tokio::task::spawn_blocking(move || {
1293                    let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1294
1295                    for header in chunk_headers {
1296                        // Fetch directly from provider - RangeMode is used for older blocks
1297                        // unlikely to be cached
1298                        let receipts = match filter_inner
1299                            .provider()
1300                            .receipts_by_block(header.hash().into())?
1301                        {
1302                            Some(receipts) => Arc::new(receipts),
1303                            None => continue, // No receipts found
1304                        };
1305
1306                        if !receipts.is_empty() {
1307                            chunk_results.push(ReceiptBlockResult {
1308                                receipts,
1309                                recovered_block: None,
1310                                header,
1311                            });
1312                        }
1313                    }
1314
1315                    Ok(chunk_results)
1316                });
1317
1318                // Await the blocking task and handle the result
1319                match chunk_task.await {
1320                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1321                    Ok(Err(e)) => Err(e),
1322                    Err(join_err) => {
1323                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1324                        Err(EthFilterError::InternalError)
1325                    }
1326                }
1327            });
1328
1329            self.pending_tasks.push_back(chunk_task);
1330        }
1331    }
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336    use super::*;
1337    use crate::{eth::EthApi, EthApiBuilder};
1338    use alloy_network::Ethereum;
1339    use alloy_primitives::FixedBytes;
1340    use rand::Rng;
1341    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1342    use reth_ethereum_primitives::TxType;
1343    use reth_evm_ethereum::EthEvmConfig;
1344    use reth_network_api::noop::NoopNetwork;
1345    use reth_provider::test_utils::MockEthProvider;
1346    use reth_rpc_convert::RpcConverter;
1347    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1348    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1349    use reth_tasks::Runtime;
1350    use reth_testing_utils::generators;
1351    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1352    use std::{collections::VecDeque, sync::Arc};
1353
1354    #[test]
1355    fn test_block_range_iter() {
1356        let mut rng = generators::rng();
1357
1358        let start = rng.random::<u32>() as u64;
1359        let end = start.saturating_add(rng.random::<u32>() as u64);
1360        let step = rng.random::<u16>() as u64;
1361        let range = start..=end;
1362        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1363        let (from, mut end) = iter.next().unwrap();
1364        assert_eq!(from, start);
1365        assert_eq!(end, (from + step).min(*range.end()));
1366
1367        for (next_from, next_end) in iter {
1368            // ensure range starts with previous end + 1
1369            assert_eq!(next_from, end + 1);
1370            end = next_end;
1371        }
1372
1373        assert_eq!(end, *range.end());
1374    }
1375
1376    // Helper function to create a test EthApi instance
1377    #[expect(clippy::type_complexity)]
1378    fn build_test_eth_api(
1379        provider: MockEthProvider,
1380    ) -> EthApi<
1381        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1382        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1383    > {
1384        EthApiBuilder::new(
1385            provider.clone(),
1386            testing_pool(),
1387            NoopNetwork::default(),
1388            EthEvmConfig::new(provider.chain_spec()),
1389        )
1390        .build()
1391    }
1392
1393    #[tokio::test]
1394    async fn test_range_block_mode_empty_range() {
1395        let provider = MockEthProvider::default();
1396        let eth_api = build_test_eth_api(provider);
1397
1398        let eth_filter =
1399            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1400        let filter_inner = eth_filter.inner;
1401
1402        let headers = vec![];
1403        let max_range = 100;
1404
1405        let mut range_mode = RangeBlockMode {
1406            filter_inner,
1407            iter: headers.into_iter().peekable(),
1408            next: VecDeque::new(),
1409            max_range,
1410            pending_tasks: FuturesOrdered::new(),
1411        };
1412
1413        let result = range_mode.next().await;
1414        assert!(result.is_ok());
1415        assert!(result.unwrap().is_none());
1416    }
1417
1418    #[tokio::test]
1419    async fn test_range_block_mode_queued_results_priority() {
1420        let provider = MockEthProvider::default();
1421        let eth_api = build_test_eth_api(provider);
1422
1423        let eth_filter =
1424            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1425        let filter_inner = eth_filter.inner;
1426
1427        let headers = vec![
1428            SealedHeader::new(
1429                alloy_consensus::Header { number: 100, ..Default::default() },
1430                FixedBytes::random(),
1431            ),
1432            SealedHeader::new(
1433                alloy_consensus::Header { number: 101, ..Default::default() },
1434                FixedBytes::random(),
1435            ),
1436        ];
1437
1438        // create specific mock results to test ordering
1439        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1440        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1441
1442        // create mock receipts to test receipt handling
1443        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1444            tx_type: TxType::Legacy,
1445            cumulative_gas_used: 100_000,
1446            logs: vec![],
1447            success: true,
1448        };
1449        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1450            tx_type: TxType::Eip1559,
1451            cumulative_gas_used: 200_000,
1452            logs: vec![],
1453            success: true,
1454        };
1455        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1456            tx_type: TxType::Eip2930,
1457            cumulative_gas_used: 150_000,
1458            logs: vec![],
1459            success: false, // Different success status
1460        };
1461
1462        let mock_result_1 = ReceiptBlockResult {
1463            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1464            recovered_block: None,
1465            header: SealedHeader::new(
1466                alloy_consensus::Header { number: 42, ..Default::default() },
1467                expected_block_hash_1,
1468            ),
1469        };
1470
1471        let mock_result_2 = ReceiptBlockResult {
1472            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1473            recovered_block: None,
1474            header: SealedHeader::new(
1475                alloy_consensus::Header { number: 43, ..Default::default() },
1476                expected_block_hash_2,
1477            ),
1478        };
1479
1480        let mut range_mode = RangeBlockMode {
1481            filter_inner,
1482            iter: headers.into_iter().peekable(),
1483            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1484            max_range: 100,
1485            pending_tasks: FuturesOrdered::new(),
1486        };
1487
1488        // first call should return the first queued result (FIFO order)
1489        let result1 = range_mode.next().await;
1490        assert!(result1.is_ok());
1491        let receipt_result1 = result1.unwrap().unwrap();
1492        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1493        assert_eq!(receipt_result1.header.number, 42);
1494
1495        // verify receipts
1496        assert_eq!(receipt_result1.receipts.len(), 2);
1497        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1498        assert_eq!(
1499            receipt_result1.receipts[0].cumulative_gas_used,
1500            mock_receipt_1.cumulative_gas_used
1501        );
1502        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1503        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1504        assert_eq!(
1505            receipt_result1.receipts[1].cumulative_gas_used,
1506            mock_receipt_2.cumulative_gas_used
1507        );
1508        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1509
1510        // second call should return the second queued result
1511        let result2 = range_mode.next().await;
1512        assert!(result2.is_ok());
1513        let receipt_result2 = result2.unwrap().unwrap();
1514        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1515        assert_eq!(receipt_result2.header.number, 43);
1516
1517        // verify receipts
1518        assert_eq!(receipt_result2.receipts.len(), 1);
1519        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1520        assert_eq!(
1521            receipt_result2.receipts[0].cumulative_gas_used,
1522            mock_receipt_3.cumulative_gas_used
1523        );
1524        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1525
1526        // queue should now be empty
1527        assert!(range_mode.next.is_empty());
1528
1529        let result3 = range_mode.next().await;
1530        assert!(result3.is_ok());
1531    }
1532
1533    #[tokio::test]
1534    async fn test_range_block_mode_single_block_no_receipts() {
1535        let provider = MockEthProvider::default();
1536        let eth_api = build_test_eth_api(provider);
1537
1538        let eth_filter =
1539            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1540        let filter_inner = eth_filter.inner;
1541
1542        let headers = vec![SealedHeader::new(
1543            alloy_consensus::Header { number: 100, ..Default::default() },
1544            FixedBytes::random(),
1545        )];
1546
1547        let mut range_mode = RangeBlockMode {
1548            filter_inner,
1549            iter: headers.into_iter().peekable(),
1550            next: VecDeque::new(),
1551            max_range: 100,
1552            pending_tasks: FuturesOrdered::new(),
1553        };
1554
1555        let result = range_mode.next().await;
1556        assert!(result.is_ok());
1557    }
1558
1559    #[tokio::test]
1560    async fn test_range_block_mode_provider_receipts() {
1561        let provider = MockEthProvider::default();
1562
1563        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1564        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1565        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1566
1567        let block_hash_1 = FixedBytes::random();
1568        let block_hash_2 = FixedBytes::random();
1569        let block_hash_3 = FixedBytes::random();
1570
1571        provider.add_header(block_hash_1, header_1.clone());
1572        provider.add_header(block_hash_2, header_2.clone());
1573        provider.add_header(block_hash_3, header_3.clone());
1574
1575        // create mock receipts to test provider fetching with mock logs
1576        let mock_log = alloy_primitives::Log {
1577            address: alloy_primitives::Address::ZERO,
1578            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1579        };
1580
1581        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1582            tx_type: TxType::Legacy,
1583            cumulative_gas_used: 21_000,
1584            logs: vec![mock_log.clone()],
1585            success: true,
1586        };
1587        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1588            tx_type: TxType::Eip1559,
1589            cumulative_gas_used: 42_000,
1590            logs: vec![mock_log.clone()],
1591            success: true,
1592        };
1593        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1594            tx_type: TxType::Eip2930,
1595            cumulative_gas_used: 30_000,
1596            logs: vec![mock_log.clone()],
1597            success: false,
1598        };
1599
1600        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1601        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1602
1603        let eth_api = build_test_eth_api(provider);
1604
1605        let eth_filter =
1606            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1607        let filter_inner = eth_filter.inner;
1608
1609        let headers = vec![
1610            SealedHeader::new(header_1, block_hash_1),
1611            SealedHeader::new(header_2, block_hash_2),
1612            SealedHeader::new(header_3, block_hash_3),
1613        ];
1614
1615        let mut range_mode = RangeBlockMode {
1616            filter_inner,
1617            iter: headers.into_iter().peekable(),
1618            next: VecDeque::new(),
1619            max_range: 3, // include the 3 blocks in the first queried results
1620            pending_tasks: FuturesOrdered::new(),
1621        };
1622
1623        // first call should fetch receipts from provider and return first block with receipts
1624        let result = range_mode.next().await;
1625        assert!(result.is_ok());
1626        let receipt_result = result.unwrap().unwrap();
1627
1628        assert_eq!(receipt_result.header.hash(), block_hash_1);
1629        assert_eq!(receipt_result.header.number, 100);
1630        assert_eq!(receipt_result.receipts.len(), 2);
1631
1632        // verify receipts
1633        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1634        assert_eq!(
1635            receipt_result.receipts[0].cumulative_gas_used,
1636            receipt_100_1.cumulative_gas_used
1637        );
1638        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1639
1640        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1641        assert_eq!(
1642            receipt_result.receipts[1].cumulative_gas_used,
1643            receipt_100_2.cumulative_gas_used
1644        );
1645        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1646
1647        // second call should return the second block with receipts
1648        let result2 = range_mode.next().await;
1649        assert!(result2.is_ok());
1650        let receipt_result2 = result2.unwrap().unwrap();
1651
1652        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1653        assert_eq!(receipt_result2.header.number, 101);
1654        assert_eq!(receipt_result2.receipts.len(), 1);
1655
1656        // verify receipts
1657        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1658        assert_eq!(
1659            receipt_result2.receipts[0].cumulative_gas_used,
1660            receipt_101_1.cumulative_gas_used
1661        );
1662        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1663
1664        // third call should return None since no more blocks with receipts
1665        let result3 = range_mode.next().await;
1666        assert!(result3.is_ok());
1667        assert!(result3.unwrap().is_none());
1668    }
1669
1670    #[tokio::test]
1671    async fn test_range_block_mode_iterator_exhaustion() {
1672        let provider = MockEthProvider::default();
1673
1674        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1675        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1676
1677        let block_hash_100 = FixedBytes::random();
1678        let block_hash_101 = FixedBytes::random();
1679
1680        // Associate headers with hashes first
1681        provider.add_header(block_hash_100, header_100.clone());
1682        provider.add_header(block_hash_101, header_101.clone());
1683
1684        // Add mock receipts so headers are actually processed
1685        let mock_receipt = reth_ethereum_primitives::Receipt {
1686            tx_type: TxType::Legacy,
1687            cumulative_gas_used: 21_000,
1688            logs: vec![],
1689            success: true,
1690        };
1691        provider.add_receipts(100, vec![mock_receipt.clone()]);
1692        provider.add_receipts(101, vec![mock_receipt.clone()]);
1693
1694        let eth_api = build_test_eth_api(provider);
1695
1696        let eth_filter =
1697            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1698        let filter_inner = eth_filter.inner;
1699
1700        let headers = vec![
1701            SealedHeader::new(header_100, block_hash_100),
1702            SealedHeader::new(header_101, block_hash_101),
1703        ];
1704
1705        let mut range_mode = RangeBlockMode {
1706            filter_inner,
1707            iter: headers.into_iter().peekable(),
1708            next: VecDeque::new(),
1709            max_range: 1,
1710            pending_tasks: FuturesOrdered::new(),
1711        };
1712
1713        let result1 = range_mode.next().await;
1714        assert!(result1.is_ok());
1715        assert!(result1.unwrap().is_some()); // Should have processed block 100
1716
1717        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1718
1719        let result2 = range_mode.next().await;
1720        assert!(result2.is_ok());
1721        assert!(result2.unwrap().is_some()); // Should have processed block 101
1722
1723        // now iterator should be exhausted
1724        assert!(range_mode.iter.peek().is_none());
1725
1726        // further calls should return None
1727        let result3 = range_mode.next().await;
1728        assert!(result3.is_ok());
1729        assert!(result3.unwrap().is_none());
1730    }
1731
1732    #[tokio::test]
1733    async fn test_cached_mode_with_mock_receipts() {
1734        // create test data
1735        let test_hash = FixedBytes::from([42u8; 32]);
1736        let test_block_number = 100u64;
1737        let test_header = SealedHeader::new(
1738            alloy_consensus::Header {
1739                number: test_block_number,
1740                gas_used: 50_000,
1741                ..Default::default()
1742            },
1743            test_hash,
1744        );
1745
1746        // add a mock receipt to the provider with a mock log
1747        let mock_log = alloy_primitives::Log {
1748            address: alloy_primitives::Address::ZERO,
1749            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1750        };
1751
1752        let mock_receipt = reth_ethereum_primitives::Receipt {
1753            tx_type: TxType::Legacy,
1754            cumulative_gas_used: 21_000,
1755            logs: vec![mock_log],
1756            success: true,
1757        };
1758
1759        let provider = MockEthProvider::default();
1760        provider.add_header(test_hash, test_header.header().clone());
1761        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1762
1763        let eth_api = build_test_eth_api(provider);
1764        let eth_filter =
1765            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1766        let filter_inner = eth_filter.inner;
1767
1768        let headers = vec![test_header.clone()];
1769
1770        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1771
1772        // should find the receipt from provider fallback (cache will be empty)
1773        let result = cached_mode.next().await.expect("next should succeed");
1774        let receipt_block_result = result.expect("should have receipt result");
1775        assert_eq!(receipt_block_result.header.hash(), test_hash);
1776        assert_eq!(receipt_block_result.header.number, test_block_number);
1777        assert_eq!(receipt_block_result.receipts.len(), 1);
1778        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1779        assert_eq!(
1780            receipt_block_result.receipts[0].cumulative_gas_used,
1781            mock_receipt.cumulative_gas_used
1782        );
1783        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1784
1785        // iterator should be exhausted
1786        let result2 = cached_mode.next().await;
1787        assert!(result2.is_ok());
1788        assert!(result2.unwrap().is_none());
1789    }
1790
1791    #[tokio::test]
1792    async fn test_cached_mode_empty_headers() {
1793        let provider = MockEthProvider::default();
1794        let eth_api = build_test_eth_api(provider);
1795
1796        let eth_filter =
1797            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1798        let filter_inner = eth_filter.inner;
1799
1800        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1801
1802        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1803
1804        // should immediately return None for empty headers
1805        let result = cached_mode.next().await.expect("next should succeed");
1806        assert!(result.is_none());
1807    }
1808
1809    #[tokio::test]
1810    async fn test_non_consecutive_headers_after_bloom_filter() {
1811        let provider = MockEthProvider::default();
1812
1813        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1814        let mut expected_hashes = vec![];
1815        let mut prev_hash = alloy_primitives::B256::default();
1816
1817        // Create a transaction for blocks that will have receipts
1818        use alloy_consensus::TxLegacy;
1819        use reth_ethereum_primitives::{TransactionSigned, TxType};
1820
1821        let tx_inner = TxLegacy {
1822            chain_id: Some(1),
1823            nonce: 0,
1824            gas_price: 21_000,
1825            gas_limit: 21_000,
1826            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1827            value: alloy_primitives::U256::ZERO,
1828            input: alloy_primitives::Bytes::new(),
1829        };
1830        let signature = alloy_primitives::Signature::test_signature();
1831        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1832
1833        for i in 100u64..=103 {
1834            let header = alloy_consensus::Header {
1835                number: i,
1836                parent_hash: prev_hash,
1837                // Set bloom to match filter only for blocks 100 and 102
1838                logs_bloom: if i == 100 || i == 102 {
1839                    alloy_primitives::Bloom::from([1u8; 256])
1840                } else {
1841                    alloy_primitives::Bloom::default()
1842                },
1843                ..Default::default()
1844            };
1845
1846            let hash = header.hash_slow();
1847            expected_hashes.push(hash);
1848            prev_hash = hash;
1849
1850            // Add transaction to blocks that will have receipts (100 and 102)
1851            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1852
1853            let block = reth_ethereum_primitives::Block {
1854                header,
1855                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1856            };
1857            provider.add_block(hash, block);
1858        }
1859
1860        // Add receipts with logs only to blocks that match bloom
1861        let mock_log = alloy_primitives::Log {
1862            address: alloy_primitives::Address::ZERO,
1863            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1864        };
1865
1866        let receipt = reth_ethereum_primitives::Receipt {
1867            tx_type: TxType::Legacy,
1868            cumulative_gas_used: 21_000,
1869            logs: vec![mock_log],
1870            success: true,
1871        };
1872
1873        provider.add_receipts(100, vec![receipt.clone()]);
1874        provider.add_receipts(101, vec![]);
1875        provider.add_receipts(102, vec![receipt.clone()]);
1876        provider.add_receipts(103, vec![]);
1877
1878        // Add block body indices for each block so receipts can be fetched
1879        use reth_db_api::models::StoredBlockBodyIndices;
1880        provider
1881            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1882        provider
1883            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1884        provider
1885            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1886        provider
1887            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1888
1889        let eth_api = build_test_eth_api(provider);
1890        let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1891
1892        // Use default filter which will match any non-empty bloom
1893        let filter = Filter::default();
1894
1895        // Get logs in the range - this will trigger the bloom filtering
1896        let logs = eth_filter
1897            .inner
1898            .clone()
1899            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1900            .await
1901            .expect("should succeed");
1902
1903        // We should get logs from blocks 100 and 102 only (bloom filtered)
1904        assert_eq!(logs.len(), 2);
1905
1906        assert_eq!(logs[0].block_number, Some(100));
1907        assert_eq!(logs[1].block_number, Some(102));
1908
1909        // Each block hash should be the hash of its own header, not derived from any other header
1910        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1911        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1912    }
1913}