Skip to main content

reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8    PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12    future::TryFutureExt,
13    stream::{FuturesOrdered, StreamExt},
14    Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21    helpers::{EthBlocks, LoadReceipt},
22    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23    RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32    ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::Runtime;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37    collections::{HashMap, VecDeque},
38    fmt,
39    iter::{Peekable, StepBy},
40    ops::RangeInclusive,
41    pin::Pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    sync::{mpsc::Receiver, oneshot, Mutex},
47    time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53    Eth: FullEthApiTypes
54        + RpcNodeCoreExt<Provider: BlockIdReader>
55        + LoadReceipt
56        + EthBlocks
57        + 'static,
58{
59    /// Returns logs matching given filter object, no query limits
60    fn logs(
61        &self,
62        filter: Filter,
63        limits: QueryLimits,
64    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65        trace!(target: "rpc::eth", "Serving eth_getLogs");
66        self.logs_for_filter(filter, limits).map_err(|e| e.into())
67    }
68}
69
70/// Threshold for deciding between cached and range mode processing
71const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73/// Threshold for bloom filter matches that triggers reduced caching
74const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76/// Threshold for bloom filter matches that triggers moderately reduced caching
77const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79/// Minimum block count to apply bloom filter match adjustments
80const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82/// The maximum number of headers we read at once when handling a range filter.
83const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
84
85/// Threshold for enabling parallel processing in range mode
86const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88/// Default concurrency for parallel processing
89const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91/// `Eth` filter RPC implementation.
92///
93/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
94pub struct EthFilter<Eth: EthApiTypes> {
95    /// All nested fields bundled together
96    inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101    Eth: EthApiTypes,
102{
103    fn clone(&self) -> Self {
104        Self { inner: self.inner.clone() }
105    }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110    Eth: EthApiTypes + 'static,
111{
112    /// Creates a new, shareable instance.
113    ///
114    /// This uses the given pool to get notified about new transactions, the provider to interact
115    /// with the blockchain, the cache to fetch cacheable data, like the logs.
116    ///
117    /// See also [`EthFilterConfig`].
118    ///
119    /// This also spawns a task that periodically clears stale filters.
120    ///
121    /// # Create a new instance with [`EthApi`](crate::EthApi)
122    ///
123    /// ```no_run
124    /// use reth_evm_ethereum::EthEvmConfig;
125    /// use reth_network_api::noop::NoopNetwork;
126    /// use reth_provider::noop::NoopProvider;
127    /// use reth_rpc::{EthApi, EthFilter};
128    /// use reth_tasks::Runtime;
129    /// use reth_transaction_pool::noop::NoopTransactionPool;
130    /// let eth_api = EthApi::builder(
131    ///     NoopProvider::default(),
132    ///     NoopTransactionPool::default(),
133    ///     NoopNetwork::default(),
134    ///     EthEvmConfig::mainnet(),
135    /// )
136    /// .build();
137    /// let filter = EthFilter::new(eth_api, Default::default(), Runtime::test());
138    /// ```
139    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self {
140        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141            config;
142        let inner = EthFilterInner {
143            eth_api,
144            active_filters: ActiveFilters::new(),
145            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146            max_headers_range: MAX_HEADERS_RANGE,
147            task_spawner,
148            stale_filter_ttl,
149            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150        };
151
152        let eth_filter = Self { inner: Arc::new(inner) };
153
154        let this = eth_filter.clone();
155        eth_filter.inner.task_spawner.spawn_critical_task(
156            "eth-filters_stale-filters-clean",
157            async move {
158                this.watch_and_clear_stale_filters().await;
159            },
160        );
161
162        eth_filter
163    }
164
165    /// Returns all currently active filters
166    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167        &self.inner.active_filters
168    }
169
170    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
171    /// Nonetheless, this endless future frees the thread at every await point.
172    async fn watch_and_clear_stale_filters(&self) {
173        let mut interval = tokio::time::interval_at(
174            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175            self.inner.stale_filter_ttl,
176        );
177        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178        loop {
179            interval.tick().await;
180            self.clear_stale_filters(Instant::now()).await;
181        }
182    }
183
184    /// Clears all filters that have not been polled for longer than the configured
185    /// `stale_filter_ttl` at the given instant.
186    pub async fn clear_stale_filters(&self, now: Instant) {
187        trace!(target: "rpc::eth", "clear stale filters");
188        let mut filters = self.active_filters().inner.lock().await;
189        filters.retain(|id, filter| {
190            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192            if !is_valid {
193                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194            }
195
196            is_valid
197        });
198        filters.shrink_to_fit();
199    }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205        + RpcNodeCoreExt
206        + LoadReceipt
207        + EthBlocks
208        + 'static,
209{
210    /// Access the underlying provider.
211    fn provider(&self) -> &Eth::Provider {
212        self.inner.eth_api.provider()
213    }
214
215    /// Access the underlying pool.
216    fn pool(&self) -> &Eth::Pool {
217        self.inner.eth_api.pool()
218    }
219
220    /// Returns all the filter changes for the given id, if any
221    pub async fn filter_changes(
222        &self,
223        id: FilterId,
224    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225        let info = self.provider().chain_info()?;
226        let best_number = info.best_number;
227
228        // start_block is the block from which we should start fetching changes, the next block from
229        // the last time changes were polled, in other words the best block at last poll + 1
230        let (start_block, kind) = {
231            let mut filters = self.inner.active_filters.inner.lock().await;
232            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234            if filter.block > best_number {
235                // no new blocks since the last poll
236                return Ok(FilterChanges::Empty)
237            }
238
239            // update filter
240            // we fetch all changes from [filter.block..best_block], so we advance the filter's
241            // block to `best_block +1`, the next from which we should start fetching changes again
242            let mut block = best_number + 1;
243            std::mem::swap(&mut filter.block, &mut block);
244            filter.last_poll_timestamp = Instant::now();
245
246            (block, filter.kind.clone())
247        };
248
249        match kind {
250            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251            FilterKind::Block => {
252                // Note: we need to fetch the block hashes from inclusive range
253                // [start_block..best_block]
254                let end_block = best_number + 1;
255                let block_hashes =
256                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
257                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258                    )?;
259                Ok(FilterChanges::Hashes(block_hashes))
260            }
261            FilterKind::Log(filter) => {
262                let (from_block_number, to_block_number) = match filter.block_option {
263                    FilterBlockOption::Range { from_block, to_block } => {
264                        let from = from_block
265                            .map(|num| self.provider().convert_block_number(num))
266                            .transpose()?
267                            .flatten();
268                        let to = to_block
269                            .map(|num| self.provider().convert_block_number(num))
270                            .transpose()?
271                            .flatten();
272                        logs_utils::get_filter_block_range(from, to, start_block, info)?
273                    }
274                    FilterBlockOption::AtBlockHash(block_hash) => {
275                        // blockHash is equivalent to fromBlock = toBlock = the block number with
276                        // hash blockHash
277                        // get_logs_in_block_range is inclusive
278                        let block_number = self
279                            .provider()
280                            .block_number(block_hash)?
281                            .ok_or(ProviderError::HeaderNotFound(block_hash.into()))?;
282                        (block_number, block_number)
283                    }
284                };
285                let logs = self
286                    .inner
287                    .clone()
288                    .get_logs_in_block_range(
289                        *filter,
290                        from_block_number,
291                        to_block_number,
292                        self.inner.query_limits,
293                    )
294                    .await?;
295                Ok(FilterChanges::Logs(logs))
296            }
297        }
298    }
299
300    /// Returns an array of all logs matching filter with given id.
301    ///
302    /// Returns an error if no matching log filter exists.
303    ///
304    /// Handler for `eth_getFilterLogs`
305    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
306        let filter = {
307            let mut filters = self.inner.active_filters.inner.lock().await;
308            let filter =
309                filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
310            if let FilterKind::Log(ref inner_filter) = filter.kind {
311                filter.last_poll_timestamp = Instant::now();
312                *inner_filter.clone()
313            } else {
314                // Not a log filter
315                return Err(EthFilterError::FilterNotFound(id))
316            }
317        };
318
319        self.logs_for_filter(filter, self.inner.query_limits).await
320    }
321
322    /// Returns logs matching given filter object.
323    async fn logs_for_filter(
324        &self,
325        filter: Filter,
326        limits: QueryLimits,
327    ) -> Result<Vec<Log>, EthFilterError> {
328        self.inner.clone().logs_for_filter(filter, limits).await
329    }
330}
331
332#[async_trait]
333impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
334where
335    Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
336{
337    /// Handler for `eth_newFilter`
338    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
339        trace!(target: "rpc::eth", "Serving eth_newFilter");
340        self.inner
341            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
342            .await
343    }
344
345    /// Handler for `eth_newBlockFilter`
346    async fn new_block_filter(&self) -> RpcResult<FilterId> {
347        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
348        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
349    }
350
351    /// Handler for `eth_newPendingTransactionFilter`
352    async fn new_pending_transaction_filter(
353        &self,
354        kind: Option<PendingTransactionFilterKind>,
355    ) -> RpcResult<FilterId> {
356        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
357
358        let transaction_kind = match kind.unwrap_or_default() {
359            PendingTransactionFilterKind::Hashes => {
360                let receiver = self.pool().pending_transactions_listener();
361                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
362                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
363            }
364            PendingTransactionFilterKind::Full => {
365                let stream = self.pool().new_pending_pool_transactions_listener();
366                let full_txs_receiver = FullTransactionsReceiver::new(
367                    stream,
368                    dyn_clone::clone(self.inner.eth_api.converter()),
369                );
370                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
371                    full_txs_receiver,
372                )))
373            }
374        };
375
376        // Install the filter and propagate any errors
377        self.inner.install_filter(transaction_kind).await
378    }
379
380    /// Handler for `eth_getFilterChanges`
381    async fn filter_changes(
382        &self,
383        id: FilterId,
384    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
385        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
386        Ok(Self::filter_changes(self, id).await?)
387    }
388
389    /// Returns an array of all logs matching filter with given id.
390    ///
391    /// Returns an error if no matching log filter exists.
392    ///
393    /// Handler for `eth_getFilterLogs`
394    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
395        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
396        Ok(Self::filter_logs(self, id).await?)
397    }
398
399    /// Handler for `eth_uninstallFilter`
400    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
401        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
402        let mut filters = self.inner.active_filters.inner.lock().await;
403        if filters.remove(&id).is_some() {
404            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
405            Ok(true)
406        } else {
407            Ok(false)
408        }
409    }
410
411    /// Returns logs matching given filter object.
412    ///
413    /// Handler for `eth_getLogs`
414    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
415        trace!(target: "rpc::eth", "Serving eth_getLogs");
416        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
417    }
418}
419
420impl<Eth> std::fmt::Debug for EthFilter<Eth>
421where
422    Eth: EthApiTypes,
423{
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        f.debug_struct("EthFilter").finish_non_exhaustive()
426    }
427}
428
429/// Container type `EthFilter`
430#[derive(Debug)]
431struct EthFilterInner<Eth: EthApiTypes> {
432    /// Inner `eth` API implementation.
433    eth_api: Eth,
434    /// All currently installed filters.
435    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
436    /// Provides ids to identify filters
437    id_provider: Arc<dyn IdProvider>,
438    /// limits for logs queries
439    query_limits: QueryLimits,
440    /// maximum number of headers to read at once for range filter
441    max_headers_range: u64,
442    /// The type that can spawn tasks.
443    task_spawner: Runtime,
444    /// Duration since the last filter poll, after which the filter is considered stale
445    stale_filter_ttl: Duration,
446}
447
448impl<Eth> EthFilterInner<Eth>
449where
450    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
451        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
452        + LoadReceipt
453        + EthBlocks
454        + 'static,
455{
456    /// Access the underlying provider.
457    fn provider(&self) -> &Eth::Provider {
458        self.eth_api.provider()
459    }
460
461    /// Access the underlying [`EthStateCache`].
462    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
463        self.eth_api.cache()
464    }
465
466    /// Returns logs matching given filter object.
467    async fn logs_for_filter(
468        self: Arc<Self>,
469        filter: Filter,
470        limits: QueryLimits,
471    ) -> Result<Vec<Log>, EthFilterError> {
472        match filter.block_option {
473            FilterBlockOption::AtBlockHash(block_hash) => {
474                // First try to get cached block and receipts, as it's likely they're already cached
475                let Some((receipts, maybe_block)) =
476                    self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
477                else {
478                    return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
479                };
480
481                // Get header - from cached block if available, otherwise from provider
482                let header = if let Some(block) = &maybe_block {
483                    block.header().clone()
484                } else {
485                    self.provider()
486                        .header_by_hash_or_number(block_hash.into())?
487                        .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?
488                };
489
490                // Check if the block has been pruned (EIP-4444)
491                let earliest_block = self.provider().earliest_block_number()?;
492                if header.number() < earliest_block {
493                    return Err(EthApiError::PrunedHistoryUnavailable.into());
494                }
495
496                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
497
498                let mut all_logs = Vec::new();
499                append_matching_block_logs(
500                    &mut all_logs,
501                    maybe_block
502                        .map(ProviderOrBlock::Block)
503                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
504                    &filter,
505                    block_num_hash,
506                    &receipts,
507                    false,
508                    header.timestamp(),
509                )?;
510
511                Ok(all_logs)
512            }
513            FilterBlockOption::Range { from_block, to_block } => {
514                // Handle special case where from block is pending
515                if from_block.is_some_and(|b| b.is_pending()) {
516                    let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
517                    if !(to_block.is_pending() || to_block.is_number()) {
518                        // always empty range
519                        return Ok(Vec::new());
520                    }
521                    // Try to get pending block and receipts
522                    if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
523                        if let BlockNumberOrTag::Number(to_block) = to_block &&
524                            to_block < pending_block.block.number()
525                        {
526                            // this block range is empty based on the user input
527                            return Ok(Vec::new());
528                        }
529
530                        let info = self.provider().chain_info()?;
531                        if pending_block.block.number() > info.best_number {
532                            // only consider the pending block if it is ahead of the chain
533                            let mut all_logs = Vec::new();
534                            let timestamp = pending_block.block.timestamp();
535                            let block_num_hash = pending_block.block.num_hash();
536                            append_matching_block_logs(
537                                &mut all_logs,
538                                ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
539                                &filter,
540                                block_num_hash,
541                                &pending_block.receipts,
542                                false, // removed = false for pending blocks
543                                timestamp,
544                            )?;
545                            return Ok(all_logs);
546                        }
547                    }
548                }
549
550                let info = self.provider().chain_info()?;
551                let start_block = info.best_number;
552                let from = from_block
553                    .map(|num| self.provider().convert_block_number(num))
554                    .transpose()?
555                    .flatten();
556                let to = to_block
557                    .map(|num| self.provider().convert_block_number(num))
558                    .transpose()?
559                    .flatten();
560
561                // Return error if toBlock exceeds current head
562                if let Some(t) = to &&
563                    t > info.best_number
564                {
565                    return Err(EthFilterError::BlockRangeExceedsHead);
566                }
567
568                if let Some(f) = from &&
569                    f > info.best_number
570                {
571                    // start block higher than local head, can return empty
572                    return Ok(Vec::new());
573                }
574
575                let (from_block_number, to_block_number) =
576                    logs_utils::get_filter_block_range(from, to, start_block, info)?;
577
578                // Check if the requested range overlaps with pruned history (EIP-4444)
579                let earliest_block = self.provider().earliest_block_number()?;
580                if from_block_number < earliest_block {
581                    return Err(EthApiError::PrunedHistoryUnavailable.into());
582                }
583
584                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
585                    .await
586            }
587        }
588    }
589
590    /// Installs a new filter and returns the new identifier.
591    async fn install_filter(
592        &self,
593        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
594    ) -> RpcResult<FilterId> {
595        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
596        let subscription_id = self.id_provider.next_id();
597
598        let id = match subscription_id {
599            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
600            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
601        };
602        let mut filters = self.active_filters.inner.lock().await;
603        filters.insert(
604            id.clone(),
605            ActiveFilter {
606                block: last_poll_block_number,
607                last_poll_timestamp: Instant::now(),
608                kind,
609            },
610        );
611        Ok(id)
612    }
613
614    /// Returns all logs in the given _inclusive_ range that match the filter
615    ///
616    /// Returns an error if:
617    ///  - underlying database error
618    ///  - amount of matches exceeds configured limit
619    async fn get_logs_in_block_range(
620        self: Arc<Self>,
621        filter: Filter,
622        from_block: u64,
623        to_block: u64,
624        limits: QueryLimits,
625    ) -> Result<Vec<Log>, EthFilterError> {
626        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
627
628        // perform boundary checks first
629        if to_block < from_block {
630            return Err(EthFilterError::InvalidBlockRangeParams)
631        }
632
633        if let Some(max_blocks_per_filter) =
634            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
635        {
636            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
637        }
638
639        let (tx, rx) = oneshot::channel();
640        let this = self.clone();
641        self.task_spawner.spawn_blocking_task(async move {
642            let res =
643                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
644            let _ = tx.send(res);
645        });
646
647        rx.await.map_err(|_| EthFilterError::InternalError)?
648    }
649
650    /// Returns all logs in the given _inclusive_ range that match the filter
651    ///
652    /// Note: This function uses a mix of blocking db operations for fetching indices and header
653    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
654    /// This function is considered blocking and should thus be spawned on a blocking task.
655    ///
656    /// Returns an error if:
657    ///  - underlying database error
658    async fn get_logs_in_block_range_inner(
659        self: Arc<Self>,
660        filter: &Filter,
661        from_block: u64,
662        to_block: u64,
663        limits: QueryLimits,
664    ) -> Result<Vec<Log>, EthFilterError> {
665        let mut all_logs = Vec::new();
666        let mut matching_headers = Vec::new();
667
668        // get current chain tip to determine processing mode
669        let chain_tip = self.provider().best_block_number()?;
670
671        // first collect all headers that match the bloom filter for cached mode decision
672        for (from, to) in
673            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
674        {
675            let headers = self.provider().headers_range(from..=to)?;
676
677            let mut headers_iter = headers.into_iter().peekable();
678
679            while let Some(header) = headers_iter.next() {
680                if !filter.matches_bloom(header.logs_bloom()) {
681                    continue
682                }
683
684                let current_number = header.number();
685
686                let block_hash = match headers_iter.peek() {
687                    Some(next_header) if next_header.number() == current_number + 1 => {
688                        // Headers are consecutive, use the more efficient parent_hash
689                        next_header.parent_hash()
690                    }
691                    _ => {
692                        // Headers not consecutive or last header, calculate hash
693                        header.hash_slow()
694                    }
695                };
696
697                matching_headers.push(SealedHeader::new(header, block_hash));
698            }
699        }
700
701        // initialize the appropriate range mode based on collected headers
702        let mut range_mode = RangeMode::new(
703            self.clone(),
704            matching_headers,
705            from_block,
706            to_block,
707            self.max_headers_range,
708            chain_tip,
709        );
710
711        // iterate through the range mode to get receipts and blocks
712        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
713            range_mode.next().await?
714        {
715            let num_hash = header.num_hash();
716            append_matching_block_logs(
717                &mut all_logs,
718                recovered_block
719                    .map(ProviderOrBlock::Block)
720                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
721                filter,
722                num_hash,
723                &receipts,
724                false,
725                header.timestamp(),
726            )?;
727
728            // size check but only if range is multiple blocks, so we always return all
729            // logs of a single block
730            let is_multi_block_range = from_block != to_block;
731            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
732                is_multi_block_range &&
733                all_logs.len() > max_logs_per_response
734            {
735                debug!(
736                    target: "rpc::eth::filter",
737                    logs_found = all_logs.len(),
738                    max_logs_per_response,
739                    from_block,
740                    to_block = num_hash.number,
741                    "Query exceeded max logs per response limit"
742                );
743                return Err(EthFilterError::QueryExceedsMaxResults {
744                    max_logs: max_logs_per_response,
745                    from_block,
746                    to_block: num_hash.number,
747                });
748            }
749        }
750
751        Ok(all_logs)
752    }
753}
754
755/// All active filters
756#[derive(Debug, Clone, Default)]
757pub struct ActiveFilters<T> {
758    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
759}
760
761impl<T> ActiveFilters<T> {
762    /// Returns an empty instance.
763    pub fn new() -> Self {
764        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
765    }
766
767    /// Returns `true` if a filter with the given id exists.
768    pub async fn contains(&self, id: &FilterId) -> bool {
769        self.inner.lock().await.contains_key(id)
770    }
771
772    /// Returns the number of currently active filters.
773    pub async fn len(&self) -> usize {
774        self.inner.lock().await.len()
775    }
776
777    /// Returns `true` if there are no active filters.
778    pub async fn is_empty(&self) -> bool {
779        self.inner.lock().await.is_empty()
780    }
781
782    /// Returns all active filter ids.
783    pub async fn ids(&self) -> Vec<FilterId> {
784        self.inner.lock().await.keys().cloned().collect()
785    }
786}
787
788/// An installed filter
789#[derive(Debug)]
790struct ActiveFilter<T> {
791    /// At which block the filter was polled last.
792    block: u64,
793    /// Last time this filter was polled.
794    last_poll_timestamp: Instant,
795    /// What kind of filter it is.
796    kind: FilterKind<T>,
797}
798
799/// A receiver for pending transactions that returns all new transactions since the last poll.
800#[derive(Debug, Clone)]
801struct PendingTransactionsReceiver {
802    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
803}
804
805impl PendingTransactionsReceiver {
806    fn new(receiver: Receiver<TxHash>) -> Self {
807        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
808    }
809
810    /// Returns all new pending transactions received since the last poll.
811    async fn drain<T>(&self) -> FilterChanges<T> {
812        let mut pending_txs = Vec::new();
813        let mut prepared_stream = self.txs_receiver.lock().await;
814
815        while let Ok(tx_hash) = prepared_stream.try_recv() {
816            pending_txs.push(tx_hash);
817        }
818
819        // Convert the vector of hashes into FilterChanges::Hashes
820        FilterChanges::Hashes(pending_txs)
821    }
822}
823
824/// A structure to manage and provide access to a stream of full transaction details.
825#[derive(Debug, Clone)]
826struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
827    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
828    converter: TxCompat,
829}
830
831impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
832where
833    T: PoolTransaction + 'static,
834    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
835{
836    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
837    fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
838        Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
839    }
840
841    /// Returns all new pending transactions received since the last poll.
842    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
843        let mut pending_txs = Vec::new();
844        let mut prepared_stream = self.txs_stream.lock().await;
845
846        while let Ok(tx) = prepared_stream.try_recv() {
847            match self.converter.fill_pending(tx.transaction.to_consensus()) {
848                Ok(tx) => pending_txs.push(tx),
849                Err(err) => {
850                    error!(target: "rpc",
851                        %err,
852                        "Failed to fill txn with block context"
853                    );
854                }
855            }
856        }
857        FilterChanges::Transactions(pending_txs)
858    }
859}
860
861/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
862#[async_trait]
863trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
864    async fn drain(&self) -> FilterChanges<T>;
865}
866
867#[async_trait]
868impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
869    for FullTransactionsReceiver<T, TxCompat>
870where
871    T: PoolTransaction + 'static,
872    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
873{
874    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
875        Self::drain(self).await
876    }
877}
878
879/// Represents the kind of pending transaction data that can be retrieved.
880///
881/// This enum differentiates between two kinds of pending transaction data:
882/// - Just the transaction hashes.
883/// - Full transaction details.
884#[derive(Debug, Clone)]
885enum PendingTransactionKind<T> {
886    Hashes(PendingTransactionsReceiver),
887    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
888}
889
890impl<T: 'static> PendingTransactionKind<T> {
891    async fn drain(&self) -> FilterChanges<T> {
892        match self {
893            Self::Hashes(receiver) => receiver.drain().await,
894            Self::FullTransaction(receiver) => receiver.drain().await,
895        }
896    }
897}
898
899#[derive(Clone, Debug)]
900enum FilterKind<T> {
901    Log(Box<Filter>),
902    Block,
903    PendingTransaction(PendingTransactionKind<T>),
904}
905
906/// An iterator that yields _inclusive_ block ranges of a given step size
907#[derive(Debug)]
908struct BlockRangeInclusiveIter {
909    iter: StepBy<RangeInclusive<u64>>,
910    step: u64,
911    end: u64,
912}
913
914impl BlockRangeInclusiveIter {
915    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
916        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
917    }
918}
919
920impl Iterator for BlockRangeInclusiveIter {
921    type Item = (u64, u64);
922
923    fn next(&mut self) -> Option<Self::Item> {
924        let start = self.iter.next()?;
925        let end = (start + self.step).min(self.end);
926        if start > end {
927            return None
928        }
929        Some((start, end))
930    }
931}
932
933/// Errors that can occur in the handler implementation
934#[derive(Debug, thiserror::Error)]
935pub enum EthFilterError {
936    /// Filter not found.
937    #[error("filter not found")]
938    FilterNotFound(FilterId),
939    /// Invalid block range.
940    #[error("invalid block range params")]
941    InvalidBlockRangeParams,
942    /// Block range extends beyond current head.
943    #[error("block range extends beyond current head block")]
944    BlockRangeExceedsHead,
945    /// Query scope is too broad.
946    #[error("query exceeds max block range {0}")]
947    QueryExceedsMaxBlocks(u64),
948    /// Query result is too large.
949    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
950    QueryExceedsMaxResults {
951        /// Maximum number of logs allowed per response
952        max_logs: usize,
953        /// Start block of the suggested retry range
954        from_block: u64,
955        /// End block of the suggested retry range (last successfully processed block)
956        to_block: u64,
957    },
958    /// Error serving request in `eth_` namespace.
959    #[error(transparent)]
960    EthAPIError(#[from] EthApiError),
961    /// Error thrown when a spawned task failed to deliver a response.
962    #[error("internal filter error")]
963    InternalError,
964}
965
966impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
967    fn from(err: EthFilterError) -> Self {
968        match err {
969            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
970                jsonrpsee::types::error::INVALID_PARAMS_CODE,
971                "filter not found",
972            ),
973            err @ EthFilterError::InternalError => {
974                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
975            }
976            EthFilterError::EthAPIError(err) => err.into(),
977            err @ (EthFilterError::InvalidBlockRangeParams |
978            EthFilterError::QueryExceedsMaxBlocks(_) |
979            EthFilterError::QueryExceedsMaxResults { .. } |
980            EthFilterError::BlockRangeExceedsHead) => {
981                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
982            }
983        }
984    }
985}
986
987impl From<ProviderError> for EthFilterError {
988    fn from(err: ProviderError) -> Self {
989        Self::EthAPIError(err.into())
990    }
991}
992
993impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
994    fn from(err: logs_utils::FilterBlockRangeError) -> Self {
995        match err {
996            logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
997            logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
998        }
999    }
1000}
1001
1002/// Helper type for the common pattern of returning receipts, block and the original header that is
1003/// a match for the filter.
1004struct ReceiptBlockResult<P>
1005where
1006    P: ReceiptProvider + BlockReader,
1007{
1008    /// We always need the entire receipts for the matching block.
1009    receipts: Arc<Vec<ProviderReceipt<P>>>,
1010    /// Block can be optional and we can fetch it lazily when needed.
1011    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
1012    /// The header of the block.
1013    header: SealedHeader<<P as HeaderProvider>::Header>,
1014}
1015
1016/// Represents different modes for processing block ranges when filtering logs
1017enum RangeMode<
1018    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1019        + EthApiTypes
1020        + LoadReceipt
1021        + EthBlocks
1022        + 'static,
1023> {
1024    /// Use cache-based processing for recent blocks
1025    Cached(CachedMode<Eth>),
1026    /// Use range-based processing for older blocks
1027    Range(RangeBlockMode<Eth>),
1028}
1029
1030impl<
1031        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1032            + EthApiTypes
1033            + LoadReceipt
1034            + EthBlocks
1035            + 'static,
1036    > RangeMode<Eth>
1037{
1038    /// Creates a new `RangeMode`.
1039    fn new(
1040        filter_inner: Arc<EthFilterInner<Eth>>,
1041        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1042        from_block: u64,
1043        to_block: u64,
1044        max_headers_range: u64,
1045        chain_tip: u64,
1046    ) -> Self {
1047        let block_count = to_block - from_block + 1;
1048        let distance_from_tip = chain_tip.saturating_sub(to_block);
1049
1050        // Determine if we should use cached mode based on range characteristics
1051        let use_cached_mode =
1052            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1053
1054        if use_cached_mode && !sealed_headers.is_empty() {
1055            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1056        } else {
1057            Self::Range(RangeBlockMode {
1058                filter_inner,
1059                iter: sealed_headers.into_iter().peekable(),
1060                next: VecDeque::new(),
1061                max_range: max_headers_range as usize,
1062                pending_tasks: FuturesOrdered::new(),
1063            })
1064        }
1065    }
1066
1067    /// Determines whether to use cached mode based on bloom filter matches and range size
1068    const fn should_use_cached_mode(
1069        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1070        block_count: u64,
1071        distance_from_tip: u64,
1072    ) -> bool {
1073        // Headers are already filtered by bloom, so count equals length
1074        let bloom_matches = headers.len();
1075
1076        // Calculate adjusted threshold based on bloom matches
1077        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1078
1079        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1080    }
1081
1082    /// Calculates the adjusted cache threshold based on bloom filter matches
1083    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1084        // Only apply adjustments for larger ranges
1085        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1086            return CACHED_MODE_BLOCK_THRESHOLD;
1087        }
1088
1089        match bloom_matches {
1090            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1091            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1092            _ => CACHED_MODE_BLOCK_THRESHOLD,
1093        }
1094    }
1095
1096    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
1097    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1098        match self {
1099            Self::Cached(cached) => cached.next().await,
1100            Self::Range(range) => range.next().await,
1101        }
1102    }
1103}
1104
1105/// Mode for processing blocks using cache optimization for recent blocks
1106struct CachedMode<
1107    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1108        + EthApiTypes
1109        + LoadReceipt
1110        + EthBlocks
1111        + 'static,
1112> {
1113    filter_inner: Arc<EthFilterInner<Eth>>,
1114    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1115}
1116
1117impl<
1118        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1119            + EthApiTypes
1120            + LoadReceipt
1121            + EthBlocks
1122            + 'static,
1123    > CachedMode<Eth>
1124{
1125    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1126        for header in self.headers_iter.by_ref() {
1127            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1128            if let Some((receipts, maybe_block)) =
1129                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1130            {
1131                return Ok(Some(ReceiptBlockResult {
1132                    receipts,
1133                    recovered_block: maybe_block,
1134                    header,
1135                }));
1136            }
1137        }
1138
1139        Ok(None) // No more headers
1140    }
1141}
1142
1143/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1144type ReceiptFetchFuture<P> =
1145    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1146
1147/// Mode for processing blocks using range queries for older blocks
1148struct RangeBlockMode<
1149    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1150        + EthApiTypes
1151        + LoadReceipt
1152        + EthBlocks
1153        + 'static,
1154> {
1155    filter_inner: Arc<EthFilterInner<Eth>>,
1156    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1157    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1158    max_range: usize,
1159    // Stream of ongoing receipt fetching tasks
1160    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1161}
1162
1163impl<
1164        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1165            + EthApiTypes
1166            + LoadReceipt
1167            + EthBlocks
1168            + 'static,
1169    > RangeBlockMode<Eth>
1170{
1171    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1172        loop {
1173            // First, try to return any already processed result from buffer
1174            if let Some(result) = self.next.pop_front() {
1175                return Ok(Some(result));
1176            }
1177
1178            // Try to get a completed task result if there are pending tasks
1179            if let Some(task_result) = self.pending_tasks.next().await {
1180                self.next.extend(task_result?);
1181                continue;
1182            }
1183
1184            // No pending tasks - try to generate more work
1185            let Some(next_header) = self.iter.next() else {
1186                // No more headers to process
1187                return Ok(None);
1188            };
1189
1190            let mut range_headers = Vec::with_capacity(self.max_range);
1191            range_headers.push(next_header);
1192
1193            // Collect consecutive blocks up to max_range size
1194            while range_headers.len() < self.max_range {
1195                let Some(peeked) = self.iter.peek() else { break };
1196                let Some(last_header) = range_headers.last() else { break };
1197
1198                let expected_next = last_header.number() + 1;
1199                if peeked.number() != expected_next {
1200                    trace!(
1201                        target: "rpc::eth::filter",
1202                        last_block = last_header.number(),
1203                        next_block = peeked.number(),
1204                        expected = expected_next,
1205                        range_size = range_headers.len(),
1206                        "Non-consecutive block detected, stopping range collection"
1207                    );
1208                    break; // Non-consecutive block, stop here
1209                }
1210
1211                let Some(next_header) = self.iter.next() else { break };
1212                range_headers.push(next_header);
1213            }
1214
1215            // Check if we should use parallel processing for large ranges
1216            let remaining_headers = self.iter.len() + range_headers.len();
1217            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1218                self.spawn_parallel_tasks(range_headers);
1219                // Continue loop to await the spawned tasks
1220            } else {
1221                // Process small range sequentially and add results to buffer
1222                if let Some(result) = self.process_small_range(range_headers).await? {
1223                    return Ok(Some(result));
1224                }
1225                // Continue loop to check for more work
1226            }
1227        }
1228    }
1229
1230    /// Process a small range of headers sequentially
1231    ///
1232    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1233    async fn process_small_range(
1234        &mut self,
1235        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1236    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1237        // Process each header individually to avoid queuing for all receipts
1238        for header in range_headers {
1239            // First check if already cached to avoid unnecessary provider calls
1240            let (maybe_block, maybe_receipts) = self
1241                .filter_inner
1242                .eth_cache()
1243                .maybe_cached_block_and_receipts(header.hash())
1244                .await?;
1245
1246            let receipts = match maybe_receipts {
1247                Some(receipts) => receipts,
1248                None => {
1249                    // Not cached - fetch directly from provider
1250                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1251                        Some(receipts) => Arc::new(receipts),
1252                        None => continue, // No receipts found
1253                    }
1254                }
1255            };
1256
1257            if !receipts.is_empty() {
1258                self.next.push_back(ReceiptBlockResult {
1259                    receipts,
1260                    recovered_block: maybe_block,
1261                    header,
1262                });
1263            }
1264        }
1265
1266        Ok(self.next.pop_front())
1267    }
1268
1269    /// Spawn parallel tasks for processing a large range of headers
1270    ///
1271    /// This is used when the remaining headers count is at or above
1272    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1273    fn spawn_parallel_tasks(
1274        &mut self,
1275        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1276    ) {
1277        // Split headers into chunks
1278        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1279        let header_chunks = range_headers
1280            .into_iter()
1281            .chunks(chunk_size)
1282            .into_iter()
1283            .map(|chunk| chunk.collect::<Vec<_>>())
1284            .collect::<Vec<_>>();
1285
1286        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1287        for chunk_headers in header_chunks {
1288            let filter_inner = self.filter_inner.clone();
1289            let chunk_task = Box::pin(async move {
1290                let chunk_task = tokio::task::spawn_blocking(move || {
1291                    let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1292
1293                    for header in chunk_headers {
1294                        // Fetch directly from provider - RangeMode is used for older blocks
1295                        // unlikely to be cached
1296                        let receipts = match filter_inner
1297                            .provider()
1298                            .receipts_by_block(header.hash().into())?
1299                        {
1300                            Some(receipts) => Arc::new(receipts),
1301                            None => continue, // No receipts found
1302                        };
1303
1304                        if !receipts.is_empty() {
1305                            chunk_results.push(ReceiptBlockResult {
1306                                receipts,
1307                                recovered_block: None,
1308                                header,
1309                            });
1310                        }
1311                    }
1312
1313                    Ok(chunk_results)
1314                });
1315
1316                // Await the blocking task and handle the result
1317                match chunk_task.await {
1318                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1319                    Ok(Err(e)) => Err(e),
1320                    Err(join_err) => {
1321                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1322                        Err(EthFilterError::InternalError)
1323                    }
1324                }
1325            });
1326
1327            self.pending_tasks.push_back(chunk_task);
1328        }
1329    }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    use super::*;
1335    use crate::{eth::EthApi, EthApiBuilder};
1336    use alloy_network::Ethereum;
1337    use alloy_primitives::FixedBytes;
1338    use rand::Rng;
1339    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1340    use reth_ethereum_primitives::TxType;
1341    use reth_evm_ethereum::EthEvmConfig;
1342    use reth_network_api::noop::NoopNetwork;
1343    use reth_provider::test_utils::MockEthProvider;
1344    use reth_rpc_convert::RpcConverter;
1345    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1346    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1347    use reth_tasks::Runtime;
1348    use reth_testing_utils::generators;
1349    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1350    use std::{collections::VecDeque, sync::Arc};
1351
1352    #[test]
1353    fn test_block_range_iter() {
1354        let mut rng = generators::rng();
1355
1356        let start = rng.random::<u32>() as u64;
1357        let end = start.saturating_add(rng.random::<u32>() as u64);
1358        let step = rng.random::<u16>() as u64;
1359        let range = start..=end;
1360        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1361        let (from, mut end) = iter.next().unwrap();
1362        assert_eq!(from, start);
1363        assert_eq!(end, (from + step).min(*range.end()));
1364
1365        for (next_from, next_end) in iter {
1366            // ensure range starts with previous end + 1
1367            assert_eq!(next_from, end + 1);
1368            end = next_end;
1369        }
1370
1371        assert_eq!(end, *range.end());
1372    }
1373
1374    // Helper function to create a test EthApi instance
1375    #[expect(clippy::type_complexity)]
1376    fn build_test_eth_api(
1377        provider: MockEthProvider,
1378    ) -> EthApi<
1379        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1380        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1381    > {
1382        EthApiBuilder::new(
1383            provider.clone(),
1384            testing_pool(),
1385            NoopNetwork::default(),
1386            EthEvmConfig::new(provider.chain_spec()),
1387        )
1388        .build()
1389    }
1390
1391    #[tokio::test]
1392    async fn test_range_block_mode_empty_range() {
1393        let provider = MockEthProvider::default();
1394        let eth_api = build_test_eth_api(provider);
1395
1396        let eth_filter =
1397            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1398        let filter_inner = eth_filter.inner;
1399
1400        let headers = vec![];
1401        let max_range = 100;
1402
1403        let mut range_mode = RangeBlockMode {
1404            filter_inner,
1405            iter: headers.into_iter().peekable(),
1406            next: VecDeque::new(),
1407            max_range,
1408            pending_tasks: FuturesOrdered::new(),
1409        };
1410
1411        let result = range_mode.next().await;
1412        assert!(result.is_ok());
1413        assert!(result.unwrap().is_none());
1414    }
1415
1416    #[tokio::test]
1417    async fn test_range_block_mode_queued_results_priority() {
1418        let provider = MockEthProvider::default();
1419        let eth_api = build_test_eth_api(provider);
1420
1421        let eth_filter =
1422            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1423        let filter_inner = eth_filter.inner;
1424
1425        let headers = vec![
1426            SealedHeader::new(
1427                alloy_consensus::Header { number: 100, ..Default::default() },
1428                FixedBytes::random(),
1429            ),
1430            SealedHeader::new(
1431                alloy_consensus::Header { number: 101, ..Default::default() },
1432                FixedBytes::random(),
1433            ),
1434        ];
1435
1436        // create specific mock results to test ordering
1437        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1438        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1439
1440        // create mock receipts to test receipt handling
1441        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1442            tx_type: TxType::Legacy,
1443            cumulative_gas_used: 100_000,
1444            logs: vec![],
1445            success: true,
1446        };
1447        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1448            tx_type: TxType::Eip1559,
1449            cumulative_gas_used: 200_000,
1450            logs: vec![],
1451            success: true,
1452        };
1453        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1454            tx_type: TxType::Eip2930,
1455            cumulative_gas_used: 150_000,
1456            logs: vec![],
1457            success: false, // Different success status
1458        };
1459
1460        let mock_result_1 = ReceiptBlockResult {
1461            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1462            recovered_block: None,
1463            header: SealedHeader::new(
1464                alloy_consensus::Header { number: 42, ..Default::default() },
1465                expected_block_hash_1,
1466            ),
1467        };
1468
1469        let mock_result_2 = ReceiptBlockResult {
1470            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1471            recovered_block: None,
1472            header: SealedHeader::new(
1473                alloy_consensus::Header { number: 43, ..Default::default() },
1474                expected_block_hash_2,
1475            ),
1476        };
1477
1478        let mut range_mode = RangeBlockMode {
1479            filter_inner,
1480            iter: headers.into_iter().peekable(),
1481            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1482            max_range: 100,
1483            pending_tasks: FuturesOrdered::new(),
1484        };
1485
1486        // first call should return the first queued result (FIFO order)
1487        let result1 = range_mode.next().await;
1488        assert!(result1.is_ok());
1489        let receipt_result1 = result1.unwrap().unwrap();
1490        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1491        assert_eq!(receipt_result1.header.number, 42);
1492
1493        // verify receipts
1494        assert_eq!(receipt_result1.receipts.len(), 2);
1495        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1496        assert_eq!(
1497            receipt_result1.receipts[0].cumulative_gas_used,
1498            mock_receipt_1.cumulative_gas_used
1499        );
1500        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1501        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1502        assert_eq!(
1503            receipt_result1.receipts[1].cumulative_gas_used,
1504            mock_receipt_2.cumulative_gas_used
1505        );
1506        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1507
1508        // second call should return the second queued result
1509        let result2 = range_mode.next().await;
1510        assert!(result2.is_ok());
1511        let receipt_result2 = result2.unwrap().unwrap();
1512        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1513        assert_eq!(receipt_result2.header.number, 43);
1514
1515        // verify receipts
1516        assert_eq!(receipt_result2.receipts.len(), 1);
1517        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1518        assert_eq!(
1519            receipt_result2.receipts[0].cumulative_gas_used,
1520            mock_receipt_3.cumulative_gas_used
1521        );
1522        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1523
1524        // queue should now be empty
1525        assert!(range_mode.next.is_empty());
1526
1527        let result3 = range_mode.next().await;
1528        assert!(result3.is_ok());
1529    }
1530
1531    #[tokio::test]
1532    async fn test_range_block_mode_single_block_no_receipts() {
1533        let provider = MockEthProvider::default();
1534        let eth_api = build_test_eth_api(provider);
1535
1536        let eth_filter =
1537            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1538        let filter_inner = eth_filter.inner;
1539
1540        let headers = vec![SealedHeader::new(
1541            alloy_consensus::Header { number: 100, ..Default::default() },
1542            FixedBytes::random(),
1543        )];
1544
1545        let mut range_mode = RangeBlockMode {
1546            filter_inner,
1547            iter: headers.into_iter().peekable(),
1548            next: VecDeque::new(),
1549            max_range: 100,
1550            pending_tasks: FuturesOrdered::new(),
1551        };
1552
1553        let result = range_mode.next().await;
1554        assert!(result.is_ok());
1555    }
1556
1557    #[tokio::test]
1558    async fn test_range_block_mode_provider_receipts() {
1559        let provider = MockEthProvider::default();
1560
1561        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1562        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1563        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1564
1565        let block_hash_1 = FixedBytes::random();
1566        let block_hash_2 = FixedBytes::random();
1567        let block_hash_3 = FixedBytes::random();
1568
1569        provider.add_header(block_hash_1, header_1.clone());
1570        provider.add_header(block_hash_2, header_2.clone());
1571        provider.add_header(block_hash_3, header_3.clone());
1572
1573        // create mock receipts to test provider fetching with mock logs
1574        let mock_log = alloy_primitives::Log {
1575            address: alloy_primitives::Address::ZERO,
1576            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1577        };
1578
1579        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1580            tx_type: TxType::Legacy,
1581            cumulative_gas_used: 21_000,
1582            logs: vec![mock_log.clone()],
1583            success: true,
1584        };
1585        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1586            tx_type: TxType::Eip1559,
1587            cumulative_gas_used: 42_000,
1588            logs: vec![mock_log.clone()],
1589            success: true,
1590        };
1591        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1592            tx_type: TxType::Eip2930,
1593            cumulative_gas_used: 30_000,
1594            logs: vec![mock_log.clone()],
1595            success: false,
1596        };
1597
1598        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1599        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1600
1601        let eth_api = build_test_eth_api(provider);
1602
1603        let eth_filter =
1604            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1605        let filter_inner = eth_filter.inner;
1606
1607        let headers = vec![
1608            SealedHeader::new(header_1, block_hash_1),
1609            SealedHeader::new(header_2, block_hash_2),
1610            SealedHeader::new(header_3, block_hash_3),
1611        ];
1612
1613        let mut range_mode = RangeBlockMode {
1614            filter_inner,
1615            iter: headers.into_iter().peekable(),
1616            next: VecDeque::new(),
1617            max_range: 3, // include the 3 blocks in the first queried results
1618            pending_tasks: FuturesOrdered::new(),
1619        };
1620
1621        // first call should fetch receipts from provider and return first block with receipts
1622        let result = range_mode.next().await;
1623        assert!(result.is_ok());
1624        let receipt_result = result.unwrap().unwrap();
1625
1626        assert_eq!(receipt_result.header.hash(), block_hash_1);
1627        assert_eq!(receipt_result.header.number, 100);
1628        assert_eq!(receipt_result.receipts.len(), 2);
1629
1630        // verify receipts
1631        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1632        assert_eq!(
1633            receipt_result.receipts[0].cumulative_gas_used,
1634            receipt_100_1.cumulative_gas_used
1635        );
1636        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1637
1638        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1639        assert_eq!(
1640            receipt_result.receipts[1].cumulative_gas_used,
1641            receipt_100_2.cumulative_gas_used
1642        );
1643        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1644
1645        // second call should return the second block with receipts
1646        let result2 = range_mode.next().await;
1647        assert!(result2.is_ok());
1648        let receipt_result2 = result2.unwrap().unwrap();
1649
1650        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1651        assert_eq!(receipt_result2.header.number, 101);
1652        assert_eq!(receipt_result2.receipts.len(), 1);
1653
1654        // verify receipts
1655        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1656        assert_eq!(
1657            receipt_result2.receipts[0].cumulative_gas_used,
1658            receipt_101_1.cumulative_gas_used
1659        );
1660        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1661
1662        // third call should return None since no more blocks with receipts
1663        let result3 = range_mode.next().await;
1664        assert!(result3.is_ok());
1665        assert!(result3.unwrap().is_none());
1666    }
1667
1668    #[tokio::test]
1669    async fn test_range_block_mode_iterator_exhaustion() {
1670        let provider = MockEthProvider::default();
1671
1672        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1673        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1674
1675        let block_hash_100 = FixedBytes::random();
1676        let block_hash_101 = FixedBytes::random();
1677
1678        // Associate headers with hashes first
1679        provider.add_header(block_hash_100, header_100.clone());
1680        provider.add_header(block_hash_101, header_101.clone());
1681
1682        // Add mock receipts so headers are actually processed
1683        let mock_receipt = reth_ethereum_primitives::Receipt {
1684            tx_type: TxType::Legacy,
1685            cumulative_gas_used: 21_000,
1686            logs: vec![],
1687            success: true,
1688        };
1689        provider.add_receipts(100, vec![mock_receipt.clone()]);
1690        provider.add_receipts(101, vec![mock_receipt.clone()]);
1691
1692        let eth_api = build_test_eth_api(provider);
1693
1694        let eth_filter =
1695            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1696        let filter_inner = eth_filter.inner;
1697
1698        let headers = vec![
1699            SealedHeader::new(header_100, block_hash_100),
1700            SealedHeader::new(header_101, block_hash_101),
1701        ];
1702
1703        let mut range_mode = RangeBlockMode {
1704            filter_inner,
1705            iter: headers.into_iter().peekable(),
1706            next: VecDeque::new(),
1707            max_range: 1,
1708            pending_tasks: FuturesOrdered::new(),
1709        };
1710
1711        let result1 = range_mode.next().await;
1712        assert!(result1.is_ok());
1713        assert!(result1.unwrap().is_some()); // Should have processed block 100
1714
1715        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1716
1717        let result2 = range_mode.next().await;
1718        assert!(result2.is_ok());
1719        assert!(result2.unwrap().is_some()); // Should have processed block 101
1720
1721        // now iterator should be exhausted
1722        assert!(range_mode.iter.peek().is_none());
1723
1724        // further calls should return None
1725        let result3 = range_mode.next().await;
1726        assert!(result3.is_ok());
1727        assert!(result3.unwrap().is_none());
1728    }
1729
1730    #[tokio::test]
1731    async fn test_cached_mode_with_mock_receipts() {
1732        // create test data
1733        let test_hash = FixedBytes::from([42u8; 32]);
1734        let test_block_number = 100u64;
1735        let test_header = SealedHeader::new(
1736            alloy_consensus::Header {
1737                number: test_block_number,
1738                gas_used: 50_000,
1739                ..Default::default()
1740            },
1741            test_hash,
1742        );
1743
1744        // add a mock receipt to the provider with a mock log
1745        let mock_log = alloy_primitives::Log {
1746            address: alloy_primitives::Address::ZERO,
1747            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1748        };
1749
1750        let mock_receipt = reth_ethereum_primitives::Receipt {
1751            tx_type: TxType::Legacy,
1752            cumulative_gas_used: 21_000,
1753            logs: vec![mock_log],
1754            success: true,
1755        };
1756
1757        let provider = MockEthProvider::default();
1758        provider.add_header(test_hash, test_header.header().clone());
1759        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1760
1761        let eth_api = build_test_eth_api(provider);
1762        let eth_filter =
1763            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1764        let filter_inner = eth_filter.inner;
1765
1766        let headers = vec![test_header.clone()];
1767
1768        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1769
1770        // should find the receipt from provider fallback (cache will be empty)
1771        let result = cached_mode.next().await.expect("next should succeed");
1772        let receipt_block_result = result.expect("should have receipt result");
1773        assert_eq!(receipt_block_result.header.hash(), test_hash);
1774        assert_eq!(receipt_block_result.header.number, test_block_number);
1775        assert_eq!(receipt_block_result.receipts.len(), 1);
1776        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1777        assert_eq!(
1778            receipt_block_result.receipts[0].cumulative_gas_used,
1779            mock_receipt.cumulative_gas_used
1780        );
1781        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1782
1783        // iterator should be exhausted
1784        let result2 = cached_mode.next().await;
1785        assert!(result2.is_ok());
1786        assert!(result2.unwrap().is_none());
1787    }
1788
1789    #[tokio::test]
1790    async fn test_cached_mode_empty_headers() {
1791        let provider = MockEthProvider::default();
1792        let eth_api = build_test_eth_api(provider);
1793
1794        let eth_filter =
1795            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1796        let filter_inner = eth_filter.inner;
1797
1798        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1799
1800        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1801
1802        // should immediately return None for empty headers
1803        let result = cached_mode.next().await.expect("next should succeed");
1804        assert!(result.is_none());
1805    }
1806
1807    #[tokio::test]
1808    async fn test_non_consecutive_headers_after_bloom_filter() {
1809        let provider = MockEthProvider::default();
1810
1811        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1812        let mut expected_hashes = vec![];
1813        let mut prev_hash = alloy_primitives::B256::default();
1814
1815        // Create a transaction for blocks that will have receipts
1816        use alloy_consensus::TxLegacy;
1817        use reth_ethereum_primitives::{TransactionSigned, TxType};
1818
1819        let tx_inner = TxLegacy {
1820            chain_id: Some(1),
1821            nonce: 0,
1822            gas_price: 21_000,
1823            gas_limit: 21_000,
1824            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1825            value: alloy_primitives::U256::ZERO,
1826            input: alloy_primitives::Bytes::new(),
1827        };
1828        let signature = alloy_primitives::Signature::test_signature();
1829        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1830
1831        for i in 100u64..=103 {
1832            let header = alloy_consensus::Header {
1833                number: i,
1834                parent_hash: prev_hash,
1835                // Set bloom to match filter only for blocks 100 and 102
1836                logs_bloom: if i == 100 || i == 102 {
1837                    alloy_primitives::Bloom::from([1u8; 256])
1838                } else {
1839                    alloy_primitives::Bloom::default()
1840                },
1841                ..Default::default()
1842            };
1843
1844            let hash = header.hash_slow();
1845            expected_hashes.push(hash);
1846            prev_hash = hash;
1847
1848            // Add transaction to blocks that will have receipts (100 and 102)
1849            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1850
1851            let block = reth_ethereum_primitives::Block {
1852                header,
1853                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1854            };
1855            provider.add_block(hash, block);
1856        }
1857
1858        // Add receipts with logs only to blocks that match bloom
1859        let mock_log = alloy_primitives::Log {
1860            address: alloy_primitives::Address::ZERO,
1861            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1862        };
1863
1864        let receipt = reth_ethereum_primitives::Receipt {
1865            tx_type: TxType::Legacy,
1866            cumulative_gas_used: 21_000,
1867            logs: vec![mock_log],
1868            success: true,
1869        };
1870
1871        provider.add_receipts(100, vec![receipt.clone()]);
1872        provider.add_receipts(101, vec![]);
1873        provider.add_receipts(102, vec![receipt.clone()]);
1874        provider.add_receipts(103, vec![]);
1875
1876        // Add block body indices for each block so receipts can be fetched
1877        use reth_db_api::models::StoredBlockBodyIndices;
1878        provider
1879            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1880        provider
1881            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1882        provider
1883            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1884        provider
1885            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1886
1887        let eth_api = build_test_eth_api(provider);
1888        let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1889
1890        // Use default filter which will match any non-empty bloom
1891        let filter = Filter::default();
1892
1893        // Get logs in the range - this will trigger the bloom filtering
1894        let logs = eth_filter
1895            .inner
1896            .clone()
1897            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1898            .await
1899            .expect("should succeed");
1900
1901        // We should get logs from blocks 100 and 102 only (bloom filtered)
1902        assert_eq!(logs.len(), 2);
1903
1904        assert_eq!(logs[0].block_number, Some(100));
1905        assert_eq!(logs[1].block_number, Some(102));
1906
1907        // Each block hash should be the hash of its own header, not derived from any other header
1908        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1909        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1910    }
1911}