reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{Sealable, TxHash};
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::{
11    future::TryFutureExt,
12    stream::{FuturesOrdered, StreamExt},
13    Future,
14};
15use itertools::Itertools;
16use jsonrpsee::{core::RpcResult, server::IdProvider};
17use reth_errors::ProviderError;
18use reth_primitives_traits::{NodePrimitives, SealedHeader};
19use reth_rpc_eth_api::{
20    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
21    RpcNodeCoreExt, RpcTransaction,
22};
23use reth_rpc_eth_types::{
24    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
25    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
26};
27use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
28use reth_storage_api::{
29    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
30    ProviderReceipt, ReceiptProvider,
31};
32use reth_tasks::TaskSpawner;
33use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
34use std::{
35    collections::{HashMap, VecDeque},
36    fmt,
37    iter::{Peekable, StepBy},
38    ops::RangeInclusive,
39    pin::Pin,
40    sync::Arc,
41    time::{Duration, Instant},
42};
43use tokio::{
44    sync::{mpsc::Receiver, oneshot, Mutex},
45    time::MissedTickBehavior,
46};
47use tracing::{debug, error, trace};
48
49impl<Eth> EngineEthFilter for EthFilter<Eth>
50where
51    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
52{
53    /// Returns logs matching given filter object, no query limits
54    fn logs(
55        &self,
56        filter: Filter,
57        limits: QueryLimits,
58    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
59        trace!(target: "rpc::eth", "Serving eth_getLogs");
60        self.logs_for_filter(filter, limits).map_err(|e| e.into())
61    }
62}
63
64/// Threshold for deciding between cached and range mode processing
65const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
66
67/// Threshold for bloom filter matches that triggers reduced caching
68const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
69
70/// Threshold for bloom filter matches that triggers moderately reduced caching
71const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
72
73/// Minimum block count to apply bloom filter match adjustments
74const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
75
76/// The maximum number of headers we read at once when handling a range filter.
77const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
78
79/// Threshold for enabling parallel processing in range mode
80const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
81
82/// Default concurrency for parallel processing
83const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
84
85/// `Eth` filter RPC implementation.
86///
87/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
88pub struct EthFilter<Eth: EthApiTypes> {
89    /// All nested fields bundled together
90    inner: Arc<EthFilterInner<Eth>>,
91}
92
93impl<Eth> Clone for EthFilter<Eth>
94where
95    Eth: EthApiTypes,
96{
97    fn clone(&self) -> Self {
98        Self { inner: self.inner.clone() }
99    }
100}
101
102impl<Eth> EthFilter<Eth>
103where
104    Eth: EthApiTypes + 'static,
105{
106    /// Creates a new, shareable instance.
107    ///
108    /// This uses the given pool to get notified about new transactions, the provider to interact
109    /// with the blockchain, the cache to fetch cacheable data, like the logs.
110    ///
111    /// See also [`EthFilterConfig`].
112    ///
113    /// This also spawns a task that periodically clears stale filters.
114    ///
115    /// # Create a new instance with [`EthApi`](crate::EthApi)
116    ///
117    /// ```no_run
118    /// use reth_evm_ethereum::EthEvmConfig;
119    /// use reth_network_api::noop::NoopNetwork;
120    /// use reth_provider::noop::NoopProvider;
121    /// use reth_rpc::{EthApi, EthFilter};
122    /// use reth_tasks::TokioTaskExecutor;
123    /// use reth_transaction_pool::noop::NoopTransactionPool;
124    /// let eth_api = EthApi::builder(
125    ///     NoopProvider::default(),
126    ///     NoopTransactionPool::default(),
127    ///     NoopNetwork::default(),
128    ///     EthEvmConfig::mainnet(),
129    /// )
130    /// .build();
131    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
132    /// ```
133    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
134        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
135            config;
136        let inner = EthFilterInner {
137            eth_api,
138            active_filters: ActiveFilters::new(),
139            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
140            max_headers_range: MAX_HEADERS_RANGE,
141            task_spawner,
142            stale_filter_ttl,
143            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
144        };
145
146        let eth_filter = Self { inner: Arc::new(inner) };
147
148        let this = eth_filter.clone();
149        eth_filter.inner.task_spawner.spawn_critical(
150            "eth-filters_stale-filters-clean",
151            Box::pin(async move {
152                this.watch_and_clear_stale_filters().await;
153            }),
154        );
155
156        eth_filter
157    }
158
159    /// Returns all currently active filters
160    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
161        &self.inner.active_filters
162    }
163
164    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
165    /// Nonetheless, this endless future frees the thread at every await point.
166    async fn watch_and_clear_stale_filters(&self) {
167        let mut interval = tokio::time::interval_at(
168            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
169            self.inner.stale_filter_ttl,
170        );
171        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
172        loop {
173            interval.tick().await;
174            self.clear_stale_filters(Instant::now()).await;
175        }
176    }
177
178    /// Clears all filters that have not been polled for longer than the configured
179    /// `stale_filter_ttl` at the given instant.
180    pub async fn clear_stale_filters(&self, now: Instant) {
181        trace!(target: "rpc::eth", "clear stale filters");
182        self.active_filters().inner.lock().await.retain(|id, filter| {
183            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
184
185            if !is_valid {
186                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
187            }
188
189            is_valid
190        })
191    }
192}
193
194impl<Eth> EthFilter<Eth>
195where
196    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
197{
198    /// Access the underlying provider.
199    fn provider(&self) -> &Eth::Provider {
200        self.inner.eth_api.provider()
201    }
202
203    /// Access the underlying pool.
204    fn pool(&self) -> &Eth::Pool {
205        self.inner.eth_api.pool()
206    }
207
208    /// Returns all the filter changes for the given id, if any
209    pub async fn filter_changes(
210        &self,
211        id: FilterId,
212    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
213        let info = self.provider().chain_info()?;
214        let best_number = info.best_number;
215
216        // start_block is the block from which we should start fetching changes, the next block from
217        // the last time changes were polled, in other words the best block at last poll + 1
218        let (start_block, kind) = {
219            let mut filters = self.inner.active_filters.inner.lock().await;
220            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
221
222            if filter.block > best_number {
223                // no new blocks since the last poll
224                return Ok(FilterChanges::Empty)
225            }
226
227            // update filter
228            // we fetch all changes from [filter.block..best_block], so we advance the filter's
229            // block to `best_block +1`, the next from which we should start fetching changes again
230            let mut block = best_number + 1;
231            std::mem::swap(&mut filter.block, &mut block);
232            filter.last_poll_timestamp = Instant::now();
233
234            (block, filter.kind.clone())
235        };
236
237        match kind {
238            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
239            FilterKind::Block => {
240                // Note: we need to fetch the block hashes from inclusive range
241                // [start_block..best_block]
242                let end_block = best_number + 1;
243                let block_hashes =
244                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
245                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
246                    )?;
247                Ok(FilterChanges::Hashes(block_hashes))
248            }
249            FilterKind::Log(filter) => {
250                let (from_block_number, to_block_number) = match filter.block_option {
251                    FilterBlockOption::Range { from_block, to_block } => {
252                        let from = from_block
253                            .map(|num| self.provider().convert_block_number(num))
254                            .transpose()?
255                            .flatten();
256                        let to = to_block
257                            .map(|num| self.provider().convert_block_number(num))
258                            .transpose()?
259                            .flatten();
260                        logs_utils::get_filter_block_range(from, to, start_block, info)
261                    }
262                    FilterBlockOption::AtBlockHash(_) => {
263                        // blockHash is equivalent to fromBlock = toBlock = the block number with
264                        // hash blockHash
265                        // get_logs_in_block_range is inclusive
266                        (start_block, best_number)
267                    }
268                };
269                let logs = self
270                    .inner
271                    .clone()
272                    .get_logs_in_block_range(
273                        *filter,
274                        from_block_number,
275                        to_block_number,
276                        self.inner.query_limits,
277                    )
278                    .await?;
279                Ok(FilterChanges::Logs(logs))
280            }
281        }
282    }
283
284    /// Returns an array of all logs matching filter with given id.
285    ///
286    /// Returns an error if no matching log filter exists.
287    ///
288    /// Handler for `eth_getFilterLogs`
289    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
290        let filter = {
291            let filters = self.inner.active_filters.inner.lock().await;
292            if let FilterKind::Log(ref filter) =
293                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
294            {
295                *filter.clone()
296            } else {
297                // Not a log filter
298                return Err(EthFilterError::FilterNotFound(id))
299            }
300        };
301
302        self.logs_for_filter(filter, self.inner.query_limits).await
303    }
304
305    /// Returns logs matching given filter object.
306    async fn logs_for_filter(
307        &self,
308        filter: Filter,
309        limits: QueryLimits,
310    ) -> Result<Vec<Log>, EthFilterError> {
311        self.inner.clone().logs_for_filter(filter, limits).await
312    }
313}
314
315#[async_trait]
316impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
317where
318    Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
319{
320    /// Handler for `eth_newFilter`
321    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
322        trace!(target: "rpc::eth", "Serving eth_newFilter");
323        self.inner
324            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
325            .await
326    }
327
328    /// Handler for `eth_newBlockFilter`
329    async fn new_block_filter(&self) -> RpcResult<FilterId> {
330        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
331        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
332    }
333
334    /// Handler for `eth_newPendingTransactionFilter`
335    async fn new_pending_transaction_filter(
336        &self,
337        kind: Option<PendingTransactionFilterKind>,
338    ) -> RpcResult<FilterId> {
339        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
340
341        let transaction_kind = match kind.unwrap_or_default() {
342            PendingTransactionFilterKind::Hashes => {
343                let receiver = self.pool().pending_transactions_listener();
344                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
345                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
346            }
347            PendingTransactionFilterKind::Full => {
348                let stream = self.pool().new_pending_pool_transactions_listener();
349                let full_txs_receiver = FullTransactionsReceiver::new(
350                    stream,
351                    dyn_clone::clone(self.inner.eth_api.tx_resp_builder()),
352                );
353                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
354                    full_txs_receiver,
355                )))
356            }
357        };
358
359        //let filter = FilterKind::PendingTransaction(transaction_kind);
360
361        // Install the filter and propagate any errors
362        self.inner.install_filter(transaction_kind).await
363    }
364
365    /// Handler for `eth_getFilterChanges`
366    async fn filter_changes(
367        &self,
368        id: FilterId,
369    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
370        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
371        Ok(Self::filter_changes(self, id).await?)
372    }
373
374    /// Returns an array of all logs matching filter with given id.
375    ///
376    /// Returns an error if no matching log filter exists.
377    ///
378    /// Handler for `eth_getFilterLogs`
379    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
380        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
381        Ok(Self::filter_logs(self, id).await?)
382    }
383
384    /// Handler for `eth_uninstallFilter`
385    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
386        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
387        let mut filters = self.inner.active_filters.inner.lock().await;
388        if filters.remove(&id).is_some() {
389            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
390            Ok(true)
391        } else {
392            Ok(false)
393        }
394    }
395
396    /// Returns logs matching given filter object.
397    ///
398    /// Handler for `eth_getLogs`
399    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
400        trace!(target: "rpc::eth", "Serving eth_getLogs");
401        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
402    }
403}
404
405impl<Eth> std::fmt::Debug for EthFilter<Eth>
406where
407    Eth: EthApiTypes,
408{
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        f.debug_struct("EthFilter").finish_non_exhaustive()
411    }
412}
413
414/// Container type `EthFilter`
415#[derive(Debug)]
416struct EthFilterInner<Eth: EthApiTypes> {
417    /// Inner `eth` API implementation.
418    eth_api: Eth,
419    /// All currently installed filters.
420    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
421    /// Provides ids to identify filters
422    id_provider: Arc<dyn IdProvider>,
423    /// limits for logs queries
424    query_limits: QueryLimits,
425    /// maximum number of headers to read at once for range filter
426    max_headers_range: u64,
427    /// The type that can spawn tasks.
428    task_spawner: Box<dyn TaskSpawner>,
429    /// Duration since the last filter poll, after which the filter is considered stale
430    stale_filter_ttl: Duration,
431}
432
433impl<Eth> EthFilterInner<Eth>
434where
435    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
436        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
437        + 'static,
438{
439    /// Access the underlying provider.
440    fn provider(&self) -> &Eth::Provider {
441        self.eth_api.provider()
442    }
443
444    /// Access the underlying [`EthStateCache`].
445    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
446        self.eth_api.cache()
447    }
448
449    /// Returns logs matching given filter object.
450    async fn logs_for_filter(
451        self: Arc<Self>,
452        filter: Filter,
453        limits: QueryLimits,
454    ) -> Result<Vec<Log>, EthFilterError> {
455        match filter.block_option {
456            FilterBlockOption::AtBlockHash(block_hash) => {
457                // for all matching logs in the block
458                // get the block header with the hash
459                let header = self
460                    .provider()
461                    .header_by_hash_or_number(block_hash.into())?
462                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
463
464                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
465
466                // we also need to ensure that the receipts are available and return an error if
467                // not, in case the block hash been reorged
468                let (receipts, maybe_block) = self
469                    .eth_cache()
470                    .get_receipts_and_maybe_block(block_num_hash.hash)
471                    .await?
472                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
473
474                let mut all_logs = Vec::new();
475                append_matching_block_logs(
476                    &mut all_logs,
477                    maybe_block
478                        .map(ProviderOrBlock::Block)
479                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
480                    &filter,
481                    block_num_hash,
482                    &receipts,
483                    false,
484                    header.timestamp(),
485                )?;
486
487                Ok(all_logs)
488            }
489            FilterBlockOption::Range { from_block, to_block } => {
490                // compute the range
491                let info = self.provider().chain_info()?;
492
493                // we start at the most recent block if unset in filter
494                let start_block = info.best_number;
495                let from = from_block
496                    .map(|num| self.provider().convert_block_number(num))
497                    .transpose()?
498                    .flatten();
499                let to = to_block
500                    .map(|num| self.provider().convert_block_number(num))
501                    .transpose()?
502                    .flatten();
503
504                if let Some(f) = from &&
505                    f > info.best_number
506                {
507                    // start block higher than local head, can return empty
508                    return Ok(Vec::new());
509                }
510
511                let (from_block_number, to_block_number) =
512                    logs_utils::get_filter_block_range(from, to, start_block, info);
513
514                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
515                    .await
516            }
517        }
518    }
519
520    /// Installs a new filter and returns the new identifier.
521    async fn install_filter(
522        &self,
523        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
524    ) -> RpcResult<FilterId> {
525        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
526        let subscription_id = self.id_provider.next_id();
527
528        let id = match subscription_id {
529            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
530            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
531        };
532        let mut filters = self.active_filters.inner.lock().await;
533        filters.insert(
534            id.clone(),
535            ActiveFilter {
536                block: last_poll_block_number,
537                last_poll_timestamp: Instant::now(),
538                kind,
539            },
540        );
541        Ok(id)
542    }
543
544    /// Returns all logs in the given _inclusive_ range that match the filter
545    ///
546    /// Returns an error if:
547    ///  - underlying database error
548    ///  - amount of matches exceeds configured limit
549    async fn get_logs_in_block_range(
550        self: Arc<Self>,
551        filter: Filter,
552        from_block: u64,
553        to_block: u64,
554        limits: QueryLimits,
555    ) -> Result<Vec<Log>, EthFilterError> {
556        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
557
558        // perform boundary checks first
559        if to_block < from_block {
560            return Err(EthFilterError::InvalidBlockRangeParams)
561        }
562
563        if let Some(max_blocks_per_filter) =
564            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
565        {
566            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
567        }
568
569        let (tx, rx) = oneshot::channel();
570        let this = self.clone();
571        self.task_spawner.spawn_blocking(Box::pin(async move {
572            let res =
573                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
574            let _ = tx.send(res);
575        }));
576
577        rx.await.map_err(|_| EthFilterError::InternalError)?
578    }
579
580    /// Returns all logs in the given _inclusive_ range that match the filter
581    ///
582    /// Note: This function uses a mix of blocking db operations for fetching indices and header
583    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
584    /// This function is considered blocking and should thus be spawned on a blocking task.
585    ///
586    /// Returns an error if:
587    ///  - underlying database error
588    async fn get_logs_in_block_range_inner(
589        self: Arc<Self>,
590        filter: &Filter,
591        from_block: u64,
592        to_block: u64,
593        limits: QueryLimits,
594    ) -> Result<Vec<Log>, EthFilterError> {
595        let mut all_logs = Vec::new();
596        let mut matching_headers = Vec::new();
597
598        // get current chain tip to determine processing mode
599        let chain_tip = self.provider().best_block_number()?;
600
601        // first collect all headers that match the bloom filter for cached mode decision
602        for (from, to) in
603            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
604        {
605            let headers = self.provider().headers_range(from..=to)?;
606
607            let mut headers_iter = headers.into_iter().peekable();
608
609            while let Some(header) = headers_iter.next() {
610                if !filter.matches_bloom(header.logs_bloom()) {
611                    continue
612                }
613
614                let current_number = header.number();
615
616                let block_hash = match headers_iter.peek() {
617                    Some(next_header) if next_header.number() == current_number + 1 => {
618                        // Headers are consecutive, use the more efficient parent_hash
619                        next_header.parent_hash()
620                    }
621                    _ => {
622                        // Headers not consecutive or last header, calculate hash
623                        header.hash_slow()
624                    }
625                };
626
627                matching_headers.push(SealedHeader::new(header, block_hash));
628            }
629        }
630
631        // initialize the appropriate range mode based on collected headers
632        let mut range_mode = RangeMode::new(
633            self.clone(),
634            matching_headers,
635            from_block,
636            to_block,
637            self.max_headers_range,
638            chain_tip,
639        );
640
641        // iterate through the range mode to get receipts and blocks
642        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
643            range_mode.next().await?
644        {
645            let num_hash = header.num_hash();
646            append_matching_block_logs(
647                &mut all_logs,
648                recovered_block
649                    .map(ProviderOrBlock::Block)
650                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
651                filter,
652                num_hash,
653                &receipts,
654                false,
655                header.timestamp(),
656            )?;
657
658            // size check but only if range is multiple blocks, so we always return all
659            // logs of a single block
660            let is_multi_block_range = from_block != to_block;
661            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
662                is_multi_block_range &&
663                all_logs.len() > max_logs_per_response
664            {
665                debug!(
666                    target: "rpc::eth::filter",
667                    logs_found = all_logs.len(),
668                    max_logs_per_response,
669                    from_block,
670                    to_block = num_hash.number.saturating_sub(1),
671                    "Query exceeded max logs per response limit"
672                );
673                return Err(EthFilterError::QueryExceedsMaxResults {
674                    max_logs: max_logs_per_response,
675                    from_block,
676                    to_block: num_hash.number.saturating_sub(1),
677                });
678            }
679        }
680
681        Ok(all_logs)
682    }
683}
684
685/// All active filters
686#[derive(Debug, Clone, Default)]
687pub struct ActiveFilters<T> {
688    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
689}
690
691impl<T> ActiveFilters<T> {
692    /// Returns an empty instance.
693    pub fn new() -> Self {
694        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
695    }
696}
697
698/// An installed filter
699#[derive(Debug)]
700struct ActiveFilter<T> {
701    /// At which block the filter was polled last.
702    block: u64,
703    /// Last time this filter was polled.
704    last_poll_timestamp: Instant,
705    /// What kind of filter it is.
706    kind: FilterKind<T>,
707}
708
709/// A receiver for pending transactions that returns all new transactions since the last poll.
710#[derive(Debug, Clone)]
711struct PendingTransactionsReceiver {
712    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
713}
714
715impl PendingTransactionsReceiver {
716    fn new(receiver: Receiver<TxHash>) -> Self {
717        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
718    }
719
720    /// Returns all new pending transactions received since the last poll.
721    async fn drain<T>(&self) -> FilterChanges<T> {
722        let mut pending_txs = Vec::new();
723        let mut prepared_stream = self.txs_receiver.lock().await;
724
725        while let Ok(tx_hash) = prepared_stream.try_recv() {
726            pending_txs.push(tx_hash);
727        }
728
729        // Convert the vector of hashes into FilterChanges::Hashes
730        FilterChanges::Hashes(pending_txs)
731    }
732}
733
734/// A structure to manage and provide access to a stream of full transaction details.
735#[derive(Debug, Clone)]
736struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
737    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
738    tx_resp_builder: TxCompat,
739}
740
741impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
742where
743    T: PoolTransaction + 'static,
744    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
745{
746    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
747    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
748        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
749    }
750
751    /// Returns all new pending transactions received since the last poll.
752    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
753        let mut pending_txs = Vec::new();
754        let mut prepared_stream = self.txs_stream.lock().await;
755
756        while let Ok(tx) = prepared_stream.try_recv() {
757            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
758                Ok(tx) => pending_txs.push(tx),
759                Err(err) => {
760                    error!(target: "rpc",
761                        %err,
762                        "Failed to fill txn with block context"
763                    );
764                }
765            }
766        }
767        FilterChanges::Transactions(pending_txs)
768    }
769}
770
771/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
772#[async_trait]
773trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
774    async fn drain(&self) -> FilterChanges<T>;
775}
776
777#[async_trait]
778impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
779    for FullTransactionsReceiver<T, TxCompat>
780where
781    T: PoolTransaction + 'static,
782    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
783{
784    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
785        Self::drain(self).await
786    }
787}
788
789/// Represents the kind of pending transaction data that can be retrieved.
790///
791/// This enum differentiates between two kinds of pending transaction data:
792/// - Just the transaction hashes.
793/// - Full transaction details.
794#[derive(Debug, Clone)]
795enum PendingTransactionKind<T> {
796    Hashes(PendingTransactionsReceiver),
797    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
798}
799
800impl<T: 'static> PendingTransactionKind<T> {
801    async fn drain(&self) -> FilterChanges<T> {
802        match self {
803            Self::Hashes(receiver) => receiver.drain().await,
804            Self::FullTransaction(receiver) => receiver.drain().await,
805        }
806    }
807}
808
809#[derive(Clone, Debug)]
810enum FilterKind<T> {
811    Log(Box<Filter>),
812    Block,
813    PendingTransaction(PendingTransactionKind<T>),
814}
815
816/// An iterator that yields _inclusive_ block ranges of a given step size
817#[derive(Debug)]
818struct BlockRangeInclusiveIter {
819    iter: StepBy<RangeInclusive<u64>>,
820    step: u64,
821    end: u64,
822}
823
824impl BlockRangeInclusiveIter {
825    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
826        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
827    }
828}
829
830impl Iterator for BlockRangeInclusiveIter {
831    type Item = (u64, u64);
832
833    fn next(&mut self) -> Option<Self::Item> {
834        let start = self.iter.next()?;
835        let end = (start + self.step).min(self.end);
836        if start > end {
837            return None
838        }
839        Some((start, end))
840    }
841}
842
843/// Errors that can occur in the handler implementation
844#[derive(Debug, thiserror::Error)]
845pub enum EthFilterError {
846    /// Filter not found.
847    #[error("filter not found")]
848    FilterNotFound(FilterId),
849    /// Invalid block range.
850    #[error("invalid block range params")]
851    InvalidBlockRangeParams,
852    /// Query scope is too broad.
853    #[error("query exceeds max block range {0}")]
854    QueryExceedsMaxBlocks(u64),
855    /// Query result is too large.
856    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
857    QueryExceedsMaxResults {
858        /// Maximum number of logs allowed per response
859        max_logs: usize,
860        /// Start block of the suggested retry range
861        from_block: u64,
862        /// End block of the suggested retry range (last successfully processed block)
863        to_block: u64,
864    },
865    /// Error serving request in `eth_` namespace.
866    #[error(transparent)]
867    EthAPIError(#[from] EthApiError),
868    /// Error thrown when a spawned task failed to deliver a response.
869    #[error("internal filter error")]
870    InternalError,
871}
872
873impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
874    fn from(err: EthFilterError) -> Self {
875        match err {
876            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
877                jsonrpsee::types::error::INVALID_PARAMS_CODE,
878                "filter not found",
879            ),
880            err @ EthFilterError::InternalError => {
881                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
882            }
883            EthFilterError::EthAPIError(err) => err.into(),
884            err @ (EthFilterError::InvalidBlockRangeParams |
885            EthFilterError::QueryExceedsMaxBlocks(_) |
886            EthFilterError::QueryExceedsMaxResults { .. }) => {
887                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
888            }
889        }
890    }
891}
892
893impl From<ProviderError> for EthFilterError {
894    fn from(err: ProviderError) -> Self {
895        Self::EthAPIError(err.into())
896    }
897}
898
899/// Helper type for the common pattern of returning receipts, block and the original header that is
900/// a match for the filter.
901struct ReceiptBlockResult<P>
902where
903    P: ReceiptProvider + BlockReader,
904{
905    /// We always need the entire receipts for the matching block.
906    receipts: Arc<Vec<ProviderReceipt<P>>>,
907    /// Block can be optional and we can fetch it lazily when needed.
908    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
909    /// The header of the block.
910    header: SealedHeader<<P as HeaderProvider>::Header>,
911}
912
913/// Represents different modes for processing block ranges when filtering logs
914enum RangeMode<
915    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
916> {
917    /// Use cache-based processing for recent blocks
918    Cached(CachedMode<Eth>),
919    /// Use range-based processing for older blocks
920    Range(RangeBlockMode<Eth>),
921}
922
923impl<
924        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
925    > RangeMode<Eth>
926{
927    /// Creates a new `RangeMode`.
928    fn new(
929        filter_inner: Arc<EthFilterInner<Eth>>,
930        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
931        from_block: u64,
932        to_block: u64,
933        max_headers_range: u64,
934        chain_tip: u64,
935    ) -> Self {
936        let block_count = to_block - from_block + 1;
937        let distance_from_tip = chain_tip.saturating_sub(to_block);
938
939        // Determine if we should use cached mode based on range characteristics
940        let use_cached_mode =
941            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
942
943        if use_cached_mode && !sealed_headers.is_empty() {
944            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
945        } else {
946            Self::Range(RangeBlockMode {
947                filter_inner,
948                iter: sealed_headers.into_iter().peekable(),
949                next: VecDeque::new(),
950                max_range: max_headers_range as usize,
951                pending_tasks: FuturesOrdered::new(),
952            })
953        }
954    }
955
956    /// Determines whether to use cached mode based on bloom filter matches and range size
957    const fn should_use_cached_mode(
958        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
959        block_count: u64,
960        distance_from_tip: u64,
961    ) -> bool {
962        // Headers are already filtered by bloom, so count equals length
963        let bloom_matches = headers.len();
964
965        // Calculate adjusted threshold based on bloom matches
966        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
967
968        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
969    }
970
971    /// Calculates the adjusted cache threshold based on bloom filter matches
972    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
973        // Only apply adjustments for larger ranges
974        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
975            return CACHED_MODE_BLOCK_THRESHOLD;
976        }
977
978        match bloom_matches {
979            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
980            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
981            _ => CACHED_MODE_BLOCK_THRESHOLD,
982        }
983    }
984
985    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
986    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
987        match self {
988            Self::Cached(cached) => cached.next().await,
989            Self::Range(range) => range.next().await,
990        }
991    }
992}
993
994/// Mode for processing blocks using cache optimization for recent blocks
995struct CachedMode<
996    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
997> {
998    filter_inner: Arc<EthFilterInner<Eth>>,
999    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1000}
1001
1002impl<
1003        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1004    > CachedMode<Eth>
1005{
1006    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1007        for header in self.headers_iter.by_ref() {
1008            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1009            if let Some((receipts, maybe_block)) =
1010                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1011            {
1012                return Ok(Some(ReceiptBlockResult {
1013                    receipts,
1014                    recovered_block: maybe_block,
1015                    header,
1016                }));
1017            }
1018        }
1019
1020        Ok(None) // No more headers
1021    }
1022}
1023
1024/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1025type ReceiptFetchFuture<P> =
1026    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1027
1028/// Mode for processing blocks using range queries for older blocks
1029struct RangeBlockMode<
1030    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1031> {
1032    filter_inner: Arc<EthFilterInner<Eth>>,
1033    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1034    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1035    max_range: usize,
1036    // Stream of ongoing receipt fetching tasks
1037    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1038}
1039
1040impl<
1041        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1042    > RangeBlockMode<Eth>
1043{
1044    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1045        loop {
1046            // First, try to return any already processed result from buffer
1047            if let Some(result) = self.next.pop_front() {
1048                return Ok(Some(result));
1049            }
1050
1051            // Try to get a completed task result if there are pending tasks
1052            if let Some(task_result) = self.pending_tasks.next().await {
1053                self.next.extend(task_result?);
1054                continue;
1055            }
1056
1057            // No pending tasks - try to generate more work
1058            let Some(next_header) = self.iter.next() else {
1059                // No more headers to process
1060                return Ok(None);
1061            };
1062
1063            let mut range_headers = Vec::with_capacity(self.max_range);
1064            range_headers.push(next_header);
1065
1066            // Collect consecutive blocks up to max_range size
1067            while range_headers.len() < self.max_range {
1068                let Some(peeked) = self.iter.peek() else { break };
1069                let Some(last_header) = range_headers.last() else { break };
1070
1071                let expected_next = last_header.number() + 1;
1072                if peeked.number() != expected_next {
1073                    debug!(
1074                        target: "rpc::eth::filter",
1075                        last_block = last_header.number(),
1076                        next_block = peeked.number(),
1077                        expected = expected_next,
1078                        range_size = range_headers.len(),
1079                        "Non-consecutive block detected, stopping range collection"
1080                    );
1081                    break; // Non-consecutive block, stop here
1082                }
1083
1084                let Some(next_header) = self.iter.next() else { break };
1085                range_headers.push(next_header);
1086            }
1087
1088            // Check if we should use parallel processing for large ranges
1089            let remaining_headers = self.iter.len() + range_headers.len();
1090            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1091                self.spawn_parallel_tasks(range_headers);
1092                // Continue loop to await the spawned tasks
1093            } else {
1094                // Process small range sequentially and add results to buffer
1095                if let Some(result) = self.process_small_range(range_headers).await? {
1096                    return Ok(Some(result));
1097                }
1098                // Continue loop to check for more work
1099            }
1100        }
1101    }
1102
1103    /// Process a small range of headers sequentially
1104    ///
1105    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1106    async fn process_small_range(
1107        &mut self,
1108        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1109    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1110        // Process each header individually to avoid queuing for all receipts
1111        for header in range_headers {
1112            // First check if already cached to avoid unnecessary provider calls
1113            let (maybe_block, maybe_receipts) = self
1114                .filter_inner
1115                .eth_cache()
1116                .maybe_cached_block_and_receipts(header.hash())
1117                .await?;
1118
1119            let receipts = match maybe_receipts {
1120                Some(receipts) => receipts,
1121                None => {
1122                    // Not cached - fetch directly from provider
1123                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1124                        Some(receipts) => Arc::new(receipts),
1125                        None => continue, // No receipts found
1126                    }
1127                }
1128            };
1129
1130            if !receipts.is_empty() {
1131                self.next.push_back(ReceiptBlockResult {
1132                    receipts,
1133                    recovered_block: maybe_block,
1134                    header,
1135                });
1136            }
1137        }
1138
1139        Ok(self.next.pop_front())
1140    }
1141
1142    /// Spawn parallel tasks for processing a large range of headers
1143    ///
1144    /// This is used when the remaining headers count is at or above
1145    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1146    fn spawn_parallel_tasks(
1147        &mut self,
1148        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1149    ) {
1150        // Split headers into chunks
1151        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1152        let header_chunks = range_headers
1153            .into_iter()
1154            .chunks(chunk_size)
1155            .into_iter()
1156            .map(|chunk| chunk.collect::<Vec<_>>())
1157            .collect::<Vec<_>>();
1158
1159        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1160        for chunk_headers in header_chunks {
1161            let filter_inner = self.filter_inner.clone();
1162            let chunk_task = Box::pin(async move {
1163                let chunk_task = tokio::task::spawn_blocking(move || {
1164                    let mut chunk_results = Vec::new();
1165
1166                    for header in chunk_headers {
1167                        // Fetch directly from provider - RangeMode is used for older blocks
1168                        // unlikely to be cached
1169                        let receipts = match filter_inner
1170                            .provider()
1171                            .receipts_by_block(header.hash().into())?
1172                        {
1173                            Some(receipts) => Arc::new(receipts),
1174                            None => continue, // No receipts found
1175                        };
1176
1177                        if !receipts.is_empty() {
1178                            chunk_results.push(ReceiptBlockResult {
1179                                receipts,
1180                                recovered_block: None,
1181                                header,
1182                            });
1183                        }
1184                    }
1185
1186                    Ok(chunk_results)
1187                });
1188
1189                // Await the blocking task and handle the result
1190                match chunk_task.await {
1191                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1192                    Ok(Err(e)) => Err(e),
1193                    Err(join_err) => {
1194                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1195                        Err(EthFilterError::InternalError)
1196                    }
1197                }
1198            });
1199
1200            self.pending_tasks.push_back(chunk_task);
1201        }
1202    }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use super::*;
1208    use crate::{eth::EthApi, EthApiBuilder};
1209    use alloy_network::Ethereum;
1210    use alloy_primitives::FixedBytes;
1211    use rand::Rng;
1212    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1213    use reth_ethereum_primitives::TxType;
1214    use reth_evm_ethereum::EthEvmConfig;
1215    use reth_network_api::noop::NoopNetwork;
1216    use reth_provider::test_utils::MockEthProvider;
1217    use reth_rpc_convert::RpcConverter;
1218    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1219    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1220    use reth_tasks::TokioTaskExecutor;
1221    use reth_testing_utils::generators;
1222    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1223    use std::{collections::VecDeque, sync::Arc};
1224
1225    #[test]
1226    fn test_block_range_iter() {
1227        let mut rng = generators::rng();
1228
1229        let start = rng.random::<u32>() as u64;
1230        let end = start.saturating_add(rng.random::<u32>() as u64);
1231        let step = rng.random::<u16>() as u64;
1232        let range = start..=end;
1233        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1234        let (from, mut end) = iter.next().unwrap();
1235        assert_eq!(from, start);
1236        assert_eq!(end, (from + step).min(*range.end()));
1237
1238        for (next_from, next_end) in iter {
1239            // ensure range starts with previous end + 1
1240            assert_eq!(next_from, end + 1);
1241            end = next_end;
1242        }
1243
1244        assert_eq!(end, *range.end());
1245    }
1246
1247    // Helper function to create a test EthApi instance
1248    #[expect(clippy::type_complexity)]
1249    fn build_test_eth_api(
1250        provider: MockEthProvider,
1251    ) -> EthApi<
1252        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1253        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1254    > {
1255        EthApiBuilder::new(
1256            provider.clone(),
1257            testing_pool(),
1258            NoopNetwork::default(),
1259            EthEvmConfig::new(provider.chain_spec()),
1260        )
1261        .build()
1262    }
1263
1264    #[tokio::test]
1265    async fn test_range_block_mode_empty_range() {
1266        let provider = MockEthProvider::default();
1267        let eth_api = build_test_eth_api(provider);
1268
1269        let eth_filter = super::EthFilter::new(
1270            eth_api,
1271            EthFilterConfig::default(),
1272            Box::new(TokioTaskExecutor::default()),
1273        );
1274        let filter_inner = eth_filter.inner;
1275
1276        let headers = vec![];
1277        let max_range = 100;
1278
1279        let mut range_mode = RangeBlockMode {
1280            filter_inner,
1281            iter: headers.into_iter().peekable(),
1282            next: VecDeque::new(),
1283            max_range,
1284            pending_tasks: FuturesOrdered::new(),
1285        };
1286
1287        let result = range_mode.next().await;
1288        assert!(result.is_ok());
1289        assert!(result.unwrap().is_none());
1290    }
1291
1292    #[tokio::test]
1293    async fn test_range_block_mode_queued_results_priority() {
1294        let provider = MockEthProvider::default();
1295        let eth_api = build_test_eth_api(provider);
1296
1297        let eth_filter = super::EthFilter::new(
1298            eth_api,
1299            EthFilterConfig::default(),
1300            Box::new(TokioTaskExecutor::default()),
1301        );
1302        let filter_inner = eth_filter.inner;
1303
1304        let headers = vec![
1305            SealedHeader::new(
1306                alloy_consensus::Header { number: 100, ..Default::default() },
1307                FixedBytes::random(),
1308            ),
1309            SealedHeader::new(
1310                alloy_consensus::Header { number: 101, ..Default::default() },
1311                FixedBytes::random(),
1312            ),
1313        ];
1314
1315        // create specific mock results to test ordering
1316        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1317        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1318
1319        // create mock receipts to test receipt handling
1320        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1321            tx_type: TxType::Legacy,
1322            cumulative_gas_used: 100_000,
1323            logs: vec![],
1324            success: true,
1325        };
1326        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1327            tx_type: TxType::Eip1559,
1328            cumulative_gas_used: 200_000,
1329            logs: vec![],
1330            success: true,
1331        };
1332        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1333            tx_type: TxType::Eip2930,
1334            cumulative_gas_used: 150_000,
1335            logs: vec![],
1336            success: false, // Different success status
1337        };
1338
1339        let mock_result_1 = ReceiptBlockResult {
1340            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1341            recovered_block: None,
1342            header: SealedHeader::new(
1343                alloy_consensus::Header { number: 42, ..Default::default() },
1344                expected_block_hash_1,
1345            ),
1346        };
1347
1348        let mock_result_2 = ReceiptBlockResult {
1349            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1350            recovered_block: None,
1351            header: SealedHeader::new(
1352                alloy_consensus::Header { number: 43, ..Default::default() },
1353                expected_block_hash_2,
1354            ),
1355        };
1356
1357        let mut range_mode = RangeBlockMode {
1358            filter_inner,
1359            iter: headers.into_iter().peekable(),
1360            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1361            max_range: 100,
1362            pending_tasks: FuturesOrdered::new(),
1363        };
1364
1365        // first call should return the first queued result (FIFO order)
1366        let result1 = range_mode.next().await;
1367        assert!(result1.is_ok());
1368        let receipt_result1 = result1.unwrap().unwrap();
1369        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1370        assert_eq!(receipt_result1.header.number, 42);
1371
1372        // verify receipts
1373        assert_eq!(receipt_result1.receipts.len(), 2);
1374        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1375        assert_eq!(
1376            receipt_result1.receipts[0].cumulative_gas_used,
1377            mock_receipt_1.cumulative_gas_used
1378        );
1379        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1380        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1381        assert_eq!(
1382            receipt_result1.receipts[1].cumulative_gas_used,
1383            mock_receipt_2.cumulative_gas_used
1384        );
1385        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1386
1387        // second call should return the second queued result
1388        let result2 = range_mode.next().await;
1389        assert!(result2.is_ok());
1390        let receipt_result2 = result2.unwrap().unwrap();
1391        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1392        assert_eq!(receipt_result2.header.number, 43);
1393
1394        // verify receipts
1395        assert_eq!(receipt_result2.receipts.len(), 1);
1396        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1397        assert_eq!(
1398            receipt_result2.receipts[0].cumulative_gas_used,
1399            mock_receipt_3.cumulative_gas_used
1400        );
1401        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1402
1403        // queue should now be empty
1404        assert!(range_mode.next.is_empty());
1405
1406        let result3 = range_mode.next().await;
1407        assert!(result3.is_ok());
1408    }
1409
1410    #[tokio::test]
1411    async fn test_range_block_mode_single_block_no_receipts() {
1412        let provider = MockEthProvider::default();
1413        let eth_api = build_test_eth_api(provider);
1414
1415        let eth_filter = super::EthFilter::new(
1416            eth_api,
1417            EthFilterConfig::default(),
1418            Box::new(TokioTaskExecutor::default()),
1419        );
1420        let filter_inner = eth_filter.inner;
1421
1422        let headers = vec![SealedHeader::new(
1423            alloy_consensus::Header { number: 100, ..Default::default() },
1424            FixedBytes::random(),
1425        )];
1426
1427        let mut range_mode = RangeBlockMode {
1428            filter_inner,
1429            iter: headers.into_iter().peekable(),
1430            next: VecDeque::new(),
1431            max_range: 100,
1432            pending_tasks: FuturesOrdered::new(),
1433        };
1434
1435        let result = range_mode.next().await;
1436        assert!(result.is_ok());
1437    }
1438
1439    #[tokio::test]
1440    async fn test_range_block_mode_provider_receipts() {
1441        let provider = MockEthProvider::default();
1442
1443        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1444        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1445        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1446
1447        let block_hash_1 = FixedBytes::random();
1448        let block_hash_2 = FixedBytes::random();
1449        let block_hash_3 = FixedBytes::random();
1450
1451        provider.add_header(block_hash_1, header_1.clone());
1452        provider.add_header(block_hash_2, header_2.clone());
1453        provider.add_header(block_hash_3, header_3.clone());
1454
1455        // create mock receipts to test provider fetching with mock logs
1456        let mock_log = alloy_primitives::Log {
1457            address: alloy_primitives::Address::ZERO,
1458            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1459        };
1460
1461        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1462            tx_type: TxType::Legacy,
1463            cumulative_gas_used: 21_000,
1464            logs: vec![mock_log.clone()],
1465            success: true,
1466        };
1467        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1468            tx_type: TxType::Eip1559,
1469            cumulative_gas_used: 42_000,
1470            logs: vec![mock_log.clone()],
1471            success: true,
1472        };
1473        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1474            tx_type: TxType::Eip2930,
1475            cumulative_gas_used: 30_000,
1476            logs: vec![mock_log.clone()],
1477            success: false,
1478        };
1479
1480        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1481        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1482
1483        let eth_api = build_test_eth_api(provider);
1484
1485        let eth_filter = super::EthFilter::new(
1486            eth_api,
1487            EthFilterConfig::default(),
1488            Box::new(TokioTaskExecutor::default()),
1489        );
1490        let filter_inner = eth_filter.inner;
1491
1492        let headers = vec![
1493            SealedHeader::new(header_1, block_hash_1),
1494            SealedHeader::new(header_2, block_hash_2),
1495            SealedHeader::new(header_3, block_hash_3),
1496        ];
1497
1498        let mut range_mode = RangeBlockMode {
1499            filter_inner,
1500            iter: headers.into_iter().peekable(),
1501            next: VecDeque::new(),
1502            max_range: 3, // include the 3 blocks in the first queried results
1503            pending_tasks: FuturesOrdered::new(),
1504        };
1505
1506        // first call should fetch receipts from provider and return first block with receipts
1507        let result = range_mode.next().await;
1508        assert!(result.is_ok());
1509        let receipt_result = result.unwrap().unwrap();
1510
1511        assert_eq!(receipt_result.header.hash(), block_hash_1);
1512        assert_eq!(receipt_result.header.number, 100);
1513        assert_eq!(receipt_result.receipts.len(), 2);
1514
1515        // verify receipts
1516        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1517        assert_eq!(
1518            receipt_result.receipts[0].cumulative_gas_used,
1519            receipt_100_1.cumulative_gas_used
1520        );
1521        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1522
1523        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1524        assert_eq!(
1525            receipt_result.receipts[1].cumulative_gas_used,
1526            receipt_100_2.cumulative_gas_used
1527        );
1528        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1529
1530        // second call should return the second block with receipts
1531        let result2 = range_mode.next().await;
1532        assert!(result2.is_ok());
1533        let receipt_result2 = result2.unwrap().unwrap();
1534
1535        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1536        assert_eq!(receipt_result2.header.number, 101);
1537        assert_eq!(receipt_result2.receipts.len(), 1);
1538
1539        // verify receipts
1540        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1541        assert_eq!(
1542            receipt_result2.receipts[0].cumulative_gas_used,
1543            receipt_101_1.cumulative_gas_used
1544        );
1545        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1546
1547        // third call should return None since no more blocks with receipts
1548        let result3 = range_mode.next().await;
1549        assert!(result3.is_ok());
1550        assert!(result3.unwrap().is_none());
1551    }
1552
1553    #[tokio::test]
1554    async fn test_range_block_mode_iterator_exhaustion() {
1555        let provider = MockEthProvider::default();
1556
1557        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1558        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1559
1560        let block_hash_100 = FixedBytes::random();
1561        let block_hash_101 = FixedBytes::random();
1562
1563        // Associate headers with hashes first
1564        provider.add_header(block_hash_100, header_100.clone());
1565        provider.add_header(block_hash_101, header_101.clone());
1566
1567        // Add mock receipts so headers are actually processed
1568        let mock_receipt = reth_ethereum_primitives::Receipt {
1569            tx_type: TxType::Legacy,
1570            cumulative_gas_used: 21_000,
1571            logs: vec![],
1572            success: true,
1573        };
1574        provider.add_receipts(100, vec![mock_receipt.clone()]);
1575        provider.add_receipts(101, vec![mock_receipt.clone()]);
1576
1577        let eth_api = build_test_eth_api(provider);
1578
1579        let eth_filter = super::EthFilter::new(
1580            eth_api,
1581            EthFilterConfig::default(),
1582            Box::new(TokioTaskExecutor::default()),
1583        );
1584        let filter_inner = eth_filter.inner;
1585
1586        let headers = vec![
1587            SealedHeader::new(header_100, block_hash_100),
1588            SealedHeader::new(header_101, block_hash_101),
1589        ];
1590
1591        let mut range_mode = RangeBlockMode {
1592            filter_inner,
1593            iter: headers.into_iter().peekable(),
1594            next: VecDeque::new(),
1595            max_range: 1,
1596            pending_tasks: FuturesOrdered::new(),
1597        };
1598
1599        let result1 = range_mode.next().await;
1600        assert!(result1.is_ok());
1601        assert!(result1.unwrap().is_some()); // Should have processed block 100
1602
1603        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1604
1605        let result2 = range_mode.next().await;
1606        assert!(result2.is_ok());
1607        assert!(result2.unwrap().is_some()); // Should have processed block 101
1608
1609        // now iterator should be exhausted
1610        assert!(range_mode.iter.peek().is_none());
1611
1612        // further calls should return None
1613        let result3 = range_mode.next().await;
1614        assert!(result3.is_ok());
1615        assert!(result3.unwrap().is_none());
1616    }
1617
1618    #[tokio::test]
1619    async fn test_cached_mode_with_mock_receipts() {
1620        // create test data
1621        let test_hash = FixedBytes::from([42u8; 32]);
1622        let test_block_number = 100u64;
1623        let test_header = SealedHeader::new(
1624            alloy_consensus::Header {
1625                number: test_block_number,
1626                gas_used: 50_000,
1627                ..Default::default()
1628            },
1629            test_hash,
1630        );
1631
1632        // add a mock receipt to the provider with a mock log
1633        let mock_log = alloy_primitives::Log {
1634            address: alloy_primitives::Address::ZERO,
1635            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1636        };
1637
1638        let mock_receipt = reth_ethereum_primitives::Receipt {
1639            tx_type: TxType::Legacy,
1640            cumulative_gas_used: 21_000,
1641            logs: vec![mock_log],
1642            success: true,
1643        };
1644
1645        let provider = MockEthProvider::default();
1646        provider.add_header(test_hash, test_header.header().clone());
1647        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1648
1649        let eth_api = build_test_eth_api(provider);
1650        let eth_filter = super::EthFilter::new(
1651            eth_api,
1652            EthFilterConfig::default(),
1653            Box::new(TokioTaskExecutor::default()),
1654        );
1655        let filter_inner = eth_filter.inner;
1656
1657        let headers = vec![test_header.clone()];
1658
1659        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1660
1661        // should find the receipt from provider fallback (cache will be empty)
1662        let result = cached_mode.next().await.expect("next should succeed");
1663        let receipt_block_result = result.expect("should have receipt result");
1664        assert_eq!(receipt_block_result.header.hash(), test_hash);
1665        assert_eq!(receipt_block_result.header.number, test_block_number);
1666        assert_eq!(receipt_block_result.receipts.len(), 1);
1667        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1668        assert_eq!(
1669            receipt_block_result.receipts[0].cumulative_gas_used,
1670            mock_receipt.cumulative_gas_used
1671        );
1672        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1673
1674        // iterator should be exhausted
1675        let result2 = cached_mode.next().await;
1676        assert!(result2.is_ok());
1677        assert!(result2.unwrap().is_none());
1678    }
1679
1680    #[tokio::test]
1681    async fn test_cached_mode_empty_headers() {
1682        let provider = MockEthProvider::default();
1683        let eth_api = build_test_eth_api(provider);
1684
1685        let eth_filter = super::EthFilter::new(
1686            eth_api,
1687            EthFilterConfig::default(),
1688            Box::new(TokioTaskExecutor::default()),
1689        );
1690        let filter_inner = eth_filter.inner;
1691
1692        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1693
1694        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1695
1696        // should immediately return None for empty headers
1697        let result = cached_mode.next().await.expect("next should succeed");
1698        assert!(result.is_none());
1699    }
1700
1701    #[tokio::test]
1702    async fn test_non_consecutive_headers_after_bloom_filter() {
1703        let provider = MockEthProvider::default();
1704
1705        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1706        let mut expected_hashes = vec![];
1707        let mut prev_hash = alloy_primitives::B256::default();
1708
1709        // Create a transaction for blocks that will have receipts
1710        use alloy_consensus::TxLegacy;
1711        use reth_ethereum_primitives::{TransactionSigned, TxType};
1712
1713        let tx_inner = TxLegacy {
1714            chain_id: Some(1),
1715            nonce: 0,
1716            gas_price: 21_000,
1717            gas_limit: 21_000,
1718            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1719            value: alloy_primitives::U256::ZERO,
1720            input: alloy_primitives::Bytes::new(),
1721        };
1722        let signature = alloy_primitives::Signature::test_signature();
1723        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1724
1725        for i in 100u64..=103 {
1726            let header = alloy_consensus::Header {
1727                number: i,
1728                parent_hash: prev_hash,
1729                // Set bloom to match filter only for blocks 100 and 102
1730                logs_bloom: if i == 100 || i == 102 {
1731                    alloy_primitives::Bloom::from([1u8; 256])
1732                } else {
1733                    alloy_primitives::Bloom::default()
1734                },
1735                ..Default::default()
1736            };
1737
1738            let hash = header.hash_slow();
1739            expected_hashes.push(hash);
1740            prev_hash = hash;
1741
1742            // Add transaction to blocks that will have receipts (100 and 102)
1743            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1744
1745            let block = reth_ethereum_primitives::Block {
1746                header,
1747                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1748            };
1749            provider.add_block(hash, block);
1750        }
1751
1752        // Add receipts with logs only to blocks that match bloom
1753        let mock_log = alloy_primitives::Log {
1754            address: alloy_primitives::Address::ZERO,
1755            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1756        };
1757
1758        let receipt = reth_ethereum_primitives::Receipt {
1759            tx_type: TxType::Legacy,
1760            cumulative_gas_used: 21_000,
1761            logs: vec![mock_log],
1762            success: true,
1763        };
1764
1765        provider.add_receipts(100, vec![receipt.clone()]);
1766        provider.add_receipts(101, vec![]);
1767        provider.add_receipts(102, vec![receipt.clone()]);
1768        provider.add_receipts(103, vec![]);
1769
1770        // Add block body indices for each block so receipts can be fetched
1771        use reth_db_api::models::StoredBlockBodyIndices;
1772        provider
1773            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1774        provider
1775            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1776        provider
1777            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1778        provider
1779            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1780
1781        let eth_api = build_test_eth_api(provider);
1782        let eth_filter = EthFilter::new(
1783            eth_api,
1784            EthFilterConfig::default(),
1785            Box::new(TokioTaskExecutor::default()),
1786        );
1787
1788        // Use default filter which will match any non-empty bloom
1789        let filter = Filter::default();
1790
1791        // Get logs in the range - this will trigger the bloom filtering
1792        let logs = eth_filter
1793            .inner
1794            .clone()
1795            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1796            .await
1797            .expect("should succeed");
1798
1799        // We should get logs from blocks 100 and 102 only (bloom filtered)
1800        assert_eq!(logs.len(), 2);
1801
1802        assert_eq!(logs[0].block_number, Some(100));
1803        assert_eq!(logs[1].block_number, Some(102));
1804
1805        // Each block hash should be the hash of its own header, not derived from any other header
1806        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1807        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1808    }
1809}