reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{Sealable, TxHash};
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::{
11    future::TryFutureExt,
12    stream::{FuturesOrdered, StreamExt},
13    Future,
14};
15use itertools::Itertools;
16use jsonrpsee::{core::RpcResult, server::IdProvider};
17use reth_errors::ProviderError;
18use reth_primitives_traits::{NodePrimitives, SealedHeader};
19use reth_rpc_eth_api::{
20    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
21    RpcNodeCoreExt, RpcTransaction,
22};
23use reth_rpc_eth_types::{
24    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
25    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
26};
27use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
28use reth_storage_api::{
29    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
30    ProviderReceipt, ReceiptProvider,
31};
32use reth_tasks::TaskSpawner;
33use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
34use std::{
35    collections::{HashMap, VecDeque},
36    fmt,
37    iter::{Peekable, StepBy},
38    ops::RangeInclusive,
39    pin::Pin,
40    sync::Arc,
41    time::{Duration, Instant},
42};
43use tokio::{
44    sync::{mpsc::Receiver, oneshot, Mutex},
45    time::MissedTickBehavior,
46};
47use tracing::{debug, error, trace};
48
49impl<Eth> EngineEthFilter for EthFilter<Eth>
50where
51    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
52{
53    /// Returns logs matching given filter object, no query limits
54    fn logs(
55        &self,
56        filter: Filter,
57        limits: QueryLimits,
58    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
59        trace!(target: "rpc::eth", "Serving eth_getLogs");
60        self.logs_for_filter(filter, limits).map_err(|e| e.into())
61    }
62}
63
64/// Threshold for deciding between cached and range mode processing
65const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
66
67/// Threshold for bloom filter matches that triggers reduced caching
68const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
69
70/// Threshold for bloom filter matches that triggers moderately reduced caching
71const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
72
73/// Minimum block count to apply bloom filter match adjustments
74const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
75
76/// The maximum number of headers we read at once when handling a range filter.
77const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
78
79/// Threshold for enabling parallel processing in range mode
80const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
81
82/// Default concurrency for parallel processing
83const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
84
85/// `Eth` filter RPC implementation.
86///
87/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
88pub struct EthFilter<Eth: EthApiTypes> {
89    /// All nested fields bundled together
90    inner: Arc<EthFilterInner<Eth>>,
91}
92
93impl<Eth> Clone for EthFilter<Eth>
94where
95    Eth: EthApiTypes,
96{
97    fn clone(&self) -> Self {
98        Self { inner: self.inner.clone() }
99    }
100}
101
102impl<Eth> EthFilter<Eth>
103where
104    Eth: EthApiTypes + 'static,
105{
106    /// Creates a new, shareable instance.
107    ///
108    /// This uses the given pool to get notified about new transactions, the provider to interact
109    /// with the blockchain, the cache to fetch cacheable data, like the logs.
110    ///
111    /// See also [`EthFilterConfig`].
112    ///
113    /// This also spawns a task that periodically clears stale filters.
114    ///
115    /// # Create a new instance with [`EthApi`](crate::EthApi)
116    ///
117    /// ```no_run
118    /// use reth_evm_ethereum::EthEvmConfig;
119    /// use reth_network_api::noop::NoopNetwork;
120    /// use reth_provider::noop::NoopProvider;
121    /// use reth_rpc::{EthApi, EthFilter};
122    /// use reth_tasks::TokioTaskExecutor;
123    /// use reth_transaction_pool::noop::NoopTransactionPool;
124    /// let eth_api = EthApi::builder(
125    ///     NoopProvider::default(),
126    ///     NoopTransactionPool::default(),
127    ///     NoopNetwork::default(),
128    ///     EthEvmConfig::mainnet(),
129    /// )
130    /// .build();
131    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
132    /// ```
133    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
134        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
135            config;
136        let inner = EthFilterInner {
137            eth_api,
138            active_filters: ActiveFilters::new(),
139            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
140            max_headers_range: MAX_HEADERS_RANGE,
141            task_spawner,
142            stale_filter_ttl,
143            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
144        };
145
146        let eth_filter = Self { inner: Arc::new(inner) };
147
148        let this = eth_filter.clone();
149        eth_filter.inner.task_spawner.spawn_critical(
150            "eth-filters_stale-filters-clean",
151            Box::pin(async move {
152                this.watch_and_clear_stale_filters().await;
153            }),
154        );
155
156        eth_filter
157    }
158
159    /// Returns all currently active filters
160    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
161        &self.inner.active_filters
162    }
163
164    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
165    /// Nonetheless, this endless future frees the thread at every await point.
166    async fn watch_and_clear_stale_filters(&self) {
167        let mut interval = tokio::time::interval_at(
168            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
169            self.inner.stale_filter_ttl,
170        );
171        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
172        loop {
173            interval.tick().await;
174            self.clear_stale_filters(Instant::now()).await;
175        }
176    }
177
178    /// Clears all filters that have not been polled for longer than the configured
179    /// `stale_filter_ttl` at the given instant.
180    pub async fn clear_stale_filters(&self, now: Instant) {
181        trace!(target: "rpc::eth", "clear stale filters");
182        self.active_filters().inner.lock().await.retain(|id, filter| {
183            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
184
185            if !is_valid {
186                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
187            }
188
189            is_valid
190        })
191    }
192}
193
194impl<Eth> EthFilter<Eth>
195where
196    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
197{
198    /// Access the underlying provider.
199    fn provider(&self) -> &Eth::Provider {
200        self.inner.eth_api.provider()
201    }
202
203    /// Access the underlying pool.
204    fn pool(&self) -> &Eth::Pool {
205        self.inner.eth_api.pool()
206    }
207
208    /// Returns all the filter changes for the given id, if any
209    pub async fn filter_changes(
210        &self,
211        id: FilterId,
212    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
213        let info = self.provider().chain_info()?;
214        let best_number = info.best_number;
215
216        // start_block is the block from which we should start fetching changes, the next block from
217        // the last time changes were polled, in other words the best block at last poll + 1
218        let (start_block, kind) = {
219            let mut filters = self.inner.active_filters.inner.lock().await;
220            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
221
222            if filter.block > best_number {
223                // no new blocks since the last poll
224                return Ok(FilterChanges::Empty)
225            }
226
227            // update filter
228            // we fetch all changes from [filter.block..best_block], so we advance the filter's
229            // block to `best_block +1`, the next from which we should start fetching changes again
230            let mut block = best_number + 1;
231            std::mem::swap(&mut filter.block, &mut block);
232            filter.last_poll_timestamp = Instant::now();
233
234            (block, filter.kind.clone())
235        };
236
237        match kind {
238            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
239            FilterKind::Block => {
240                // Note: we need to fetch the block hashes from inclusive range
241                // [start_block..best_block]
242                let end_block = best_number + 1;
243                let block_hashes =
244                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
245                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
246                    )?;
247                Ok(FilterChanges::Hashes(block_hashes))
248            }
249            FilterKind::Log(filter) => {
250                let (from_block_number, to_block_number) = match filter.block_option {
251                    FilterBlockOption::Range { from_block, to_block } => {
252                        let from = from_block
253                            .map(|num| self.provider().convert_block_number(num))
254                            .transpose()?
255                            .flatten();
256                        let to = to_block
257                            .map(|num| self.provider().convert_block_number(num))
258                            .transpose()?
259                            .flatten();
260                        logs_utils::get_filter_block_range(from, to, start_block, info)
261                    }
262                    FilterBlockOption::AtBlockHash(_) => {
263                        // blockHash is equivalent to fromBlock = toBlock = the block number with
264                        // hash blockHash
265                        // get_logs_in_block_range is inclusive
266                        (start_block, best_number)
267                    }
268                };
269                let logs = self
270                    .inner
271                    .clone()
272                    .get_logs_in_block_range(
273                        *filter,
274                        from_block_number,
275                        to_block_number,
276                        self.inner.query_limits,
277                    )
278                    .await?;
279                Ok(FilterChanges::Logs(logs))
280            }
281        }
282    }
283
284    /// Returns an array of all logs matching filter with given id.
285    ///
286    /// Returns an error if no matching log filter exists.
287    ///
288    /// Handler for `eth_getFilterLogs`
289    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
290        let filter = {
291            let filters = self.inner.active_filters.inner.lock().await;
292            if let FilterKind::Log(ref filter) =
293                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
294            {
295                *filter.clone()
296            } else {
297                // Not a log filter
298                return Err(EthFilterError::FilterNotFound(id))
299            }
300        };
301
302        self.logs_for_filter(filter, self.inner.query_limits).await
303    }
304
305    /// Returns logs matching given filter object.
306    async fn logs_for_filter(
307        &self,
308        filter: Filter,
309        limits: QueryLimits,
310    ) -> Result<Vec<Log>, EthFilterError> {
311        self.inner.clone().logs_for_filter(filter, limits).await
312    }
313}
314
315#[async_trait]
316impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
317where
318    Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
319{
320    /// Handler for `eth_newFilter`
321    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
322        trace!(target: "rpc::eth", "Serving eth_newFilter");
323        self.inner
324            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
325            .await
326    }
327
328    /// Handler for `eth_newBlockFilter`
329    async fn new_block_filter(&self) -> RpcResult<FilterId> {
330        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
331        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
332    }
333
334    /// Handler for `eth_newPendingTransactionFilter`
335    async fn new_pending_transaction_filter(
336        &self,
337        kind: Option<PendingTransactionFilterKind>,
338    ) -> RpcResult<FilterId> {
339        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
340
341        let transaction_kind = match kind.unwrap_or_default() {
342            PendingTransactionFilterKind::Hashes => {
343                let receiver = self.pool().pending_transactions_listener();
344                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
345                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
346            }
347            PendingTransactionFilterKind::Full => {
348                let stream = self.pool().new_pending_pool_transactions_listener();
349                let full_txs_receiver = FullTransactionsReceiver::new(
350                    stream,
351                    dyn_clone::clone(self.inner.eth_api.tx_resp_builder()),
352                );
353                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
354                    full_txs_receiver,
355                )))
356            }
357        };
358
359        //let filter = FilterKind::PendingTransaction(transaction_kind);
360
361        // Install the filter and propagate any errors
362        self.inner.install_filter(transaction_kind).await
363    }
364
365    /// Handler for `eth_getFilterChanges`
366    async fn filter_changes(
367        &self,
368        id: FilterId,
369    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
370        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
371        Ok(Self::filter_changes(self, id).await?)
372    }
373
374    /// Returns an array of all logs matching filter with given id.
375    ///
376    /// Returns an error if no matching log filter exists.
377    ///
378    /// Handler for `eth_getFilterLogs`
379    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
380        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
381        Ok(Self::filter_logs(self, id).await?)
382    }
383
384    /// Handler for `eth_uninstallFilter`
385    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
386        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
387        let mut filters = self.inner.active_filters.inner.lock().await;
388        if filters.remove(&id).is_some() {
389            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
390            Ok(true)
391        } else {
392            Ok(false)
393        }
394    }
395
396    /// Returns logs matching given filter object.
397    ///
398    /// Handler for `eth_getLogs`
399    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
400        trace!(target: "rpc::eth", "Serving eth_getLogs");
401        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
402    }
403}
404
405impl<Eth> std::fmt::Debug for EthFilter<Eth>
406where
407    Eth: EthApiTypes,
408{
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        f.debug_struct("EthFilter").finish_non_exhaustive()
411    }
412}
413
414/// Container type `EthFilter`
415#[derive(Debug)]
416struct EthFilterInner<Eth: EthApiTypes> {
417    /// Inner `eth` API implementation.
418    eth_api: Eth,
419    /// All currently installed filters.
420    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
421    /// Provides ids to identify filters
422    id_provider: Arc<dyn IdProvider>,
423    /// limits for logs queries
424    query_limits: QueryLimits,
425    /// maximum number of headers to read at once for range filter
426    max_headers_range: u64,
427    /// The type that can spawn tasks.
428    task_spawner: Box<dyn TaskSpawner>,
429    /// Duration since the last filter poll, after which the filter is considered stale
430    stale_filter_ttl: Duration,
431}
432
433impl<Eth> EthFilterInner<Eth>
434where
435    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
436        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
437        + 'static,
438{
439    /// Access the underlying provider.
440    fn provider(&self) -> &Eth::Provider {
441        self.eth_api.provider()
442    }
443
444    /// Access the underlying [`EthStateCache`].
445    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
446        self.eth_api.cache()
447    }
448
449    /// Returns logs matching given filter object.
450    async fn logs_for_filter(
451        self: Arc<Self>,
452        filter: Filter,
453        limits: QueryLimits,
454    ) -> Result<Vec<Log>, EthFilterError> {
455        match filter.block_option {
456            FilterBlockOption::AtBlockHash(block_hash) => {
457                // for all matching logs in the block
458                // get the block header with the hash
459                let header = self
460                    .provider()
461                    .header_by_hash_or_number(block_hash.into())?
462                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
463
464                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
465
466                // we also need to ensure that the receipts are available and return an error if
467                // not, in case the block hash been reorged
468                let (receipts, maybe_block) = self
469                    .eth_cache()
470                    .get_receipts_and_maybe_block(block_num_hash.hash)
471                    .await?
472                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
473
474                let mut all_logs = Vec::new();
475                append_matching_block_logs(
476                    &mut all_logs,
477                    maybe_block
478                        .map(ProviderOrBlock::Block)
479                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
480                    &filter,
481                    block_num_hash,
482                    &receipts,
483                    false,
484                    header.timestamp(),
485                )?;
486
487                Ok(all_logs)
488            }
489            FilterBlockOption::Range { from_block, to_block } => {
490                // compute the range
491                let info = self.provider().chain_info()?;
492
493                // we start at the most recent block if unset in filter
494                let start_block = info.best_number;
495                let from = from_block
496                    .map(|num| self.provider().convert_block_number(num))
497                    .transpose()?
498                    .flatten();
499                let to = to_block
500                    .map(|num| self.provider().convert_block_number(num))
501                    .transpose()?
502                    .flatten();
503
504                if let Some(f) = from {
505                    if f > info.best_number {
506                        // start block higher than local head, can return empty
507                        return Ok(Vec::new());
508                    }
509                }
510
511                let (from_block_number, to_block_number) =
512                    logs_utils::get_filter_block_range(from, to, start_block, info);
513
514                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
515                    .await
516            }
517        }
518    }
519
520    /// Installs a new filter and returns the new identifier.
521    async fn install_filter(
522        &self,
523        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
524    ) -> RpcResult<FilterId> {
525        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
526        let subscription_id = self.id_provider.next_id();
527
528        let id = match subscription_id {
529            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
530            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
531        };
532        let mut filters = self.active_filters.inner.lock().await;
533        filters.insert(
534            id.clone(),
535            ActiveFilter {
536                block: last_poll_block_number,
537                last_poll_timestamp: Instant::now(),
538                kind,
539            },
540        );
541        Ok(id)
542    }
543
544    /// Returns all logs in the given _inclusive_ range that match the filter
545    ///
546    /// Returns an error if:
547    ///  - underlying database error
548    ///  - amount of matches exceeds configured limit
549    async fn get_logs_in_block_range(
550        self: Arc<Self>,
551        filter: Filter,
552        from_block: u64,
553        to_block: u64,
554        limits: QueryLimits,
555    ) -> Result<Vec<Log>, EthFilterError> {
556        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
557
558        // perform boundary checks first
559        if to_block < from_block {
560            return Err(EthFilterError::InvalidBlockRangeParams)
561        }
562
563        if let Some(max_blocks_per_filter) =
564            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
565        {
566            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
567        }
568
569        let (tx, rx) = oneshot::channel();
570        let this = self.clone();
571        self.task_spawner.spawn_blocking(Box::pin(async move {
572            let res =
573                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
574            let _ = tx.send(res);
575        }));
576
577        rx.await.map_err(|_| EthFilterError::InternalError)?
578    }
579
580    /// Returns all logs in the given _inclusive_ range that match the filter
581    ///
582    /// Note: This function uses a mix of blocking db operations for fetching indices and header
583    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
584    /// This function is considered blocking and should thus be spawned on a blocking task.
585    ///
586    /// Returns an error if:
587    ///  - underlying database error
588    async fn get_logs_in_block_range_inner(
589        self: Arc<Self>,
590        filter: &Filter,
591        from_block: u64,
592        to_block: u64,
593        limits: QueryLimits,
594    ) -> Result<Vec<Log>, EthFilterError> {
595        let mut all_logs = Vec::new();
596        let mut matching_headers = Vec::new();
597
598        // get current chain tip to determine processing mode
599        let chain_tip = self.provider().best_block_number()?;
600
601        // first collect all headers that match the bloom filter for cached mode decision
602        for (from, to) in
603            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
604        {
605            let headers = self.provider().headers_range(from..=to)?;
606
607            let mut headers_iter = headers.into_iter().peekable();
608
609            while let Some(header) = headers_iter.next() {
610                if !filter.matches_bloom(header.logs_bloom()) {
611                    continue
612                }
613
614                let current_number = header.number();
615
616                let block_hash = match headers_iter.peek() {
617                    Some(next_header) if next_header.number() == current_number + 1 => {
618                        // Headers are consecutive, use the more efficient parent_hash
619                        next_header.parent_hash()
620                    }
621                    _ => {
622                        // Headers not consecutive or last header, calculate hash
623                        header.hash_slow()
624                    }
625                };
626
627                matching_headers.push(SealedHeader::new(header, block_hash));
628            }
629        }
630
631        // initialize the appropriate range mode based on collected headers
632        let mut range_mode = RangeMode::new(
633            self.clone(),
634            matching_headers,
635            from_block,
636            to_block,
637            self.max_headers_range,
638            chain_tip,
639        );
640
641        // iterate through the range mode to get receipts and blocks
642        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
643            range_mode.next().await?
644        {
645            let num_hash = header.num_hash();
646            append_matching_block_logs(
647                &mut all_logs,
648                recovered_block
649                    .map(ProviderOrBlock::Block)
650                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
651                filter,
652                num_hash,
653                &receipts,
654                false,
655                header.timestamp(),
656            )?;
657
658            // size check but only if range is multiple blocks, so we always return all
659            // logs of a single block
660            let is_multi_block_range = from_block != to_block;
661            if let Some(max_logs_per_response) = limits.max_logs_per_response {
662                if is_multi_block_range && all_logs.len() > max_logs_per_response {
663                    debug!(
664                        target: "rpc::eth::filter",
665                        logs_found = all_logs.len(),
666                        max_logs_per_response,
667                        from_block,
668                        to_block = num_hash.number.saturating_sub(1),
669                        "Query exceeded max logs per response limit"
670                    );
671                    return Err(EthFilterError::QueryExceedsMaxResults {
672                        max_logs: max_logs_per_response,
673                        from_block,
674                        to_block: num_hash.number.saturating_sub(1),
675                    });
676                }
677            }
678        }
679
680        Ok(all_logs)
681    }
682}
683
684/// All active filters
685#[derive(Debug, Clone, Default)]
686pub struct ActiveFilters<T> {
687    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
688}
689
690impl<T> ActiveFilters<T> {
691    /// Returns an empty instance.
692    pub fn new() -> Self {
693        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
694    }
695}
696
697/// An installed filter
698#[derive(Debug)]
699struct ActiveFilter<T> {
700    /// At which block the filter was polled last.
701    block: u64,
702    /// Last time this filter was polled.
703    last_poll_timestamp: Instant,
704    /// What kind of filter it is.
705    kind: FilterKind<T>,
706}
707
708/// A receiver for pending transactions that returns all new transactions since the last poll.
709#[derive(Debug, Clone)]
710struct PendingTransactionsReceiver {
711    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
712}
713
714impl PendingTransactionsReceiver {
715    fn new(receiver: Receiver<TxHash>) -> Self {
716        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
717    }
718
719    /// Returns all new pending transactions received since the last poll.
720    async fn drain<T>(&self) -> FilterChanges<T> {
721        let mut pending_txs = Vec::new();
722        let mut prepared_stream = self.txs_receiver.lock().await;
723
724        while let Ok(tx_hash) = prepared_stream.try_recv() {
725            pending_txs.push(tx_hash);
726        }
727
728        // Convert the vector of hashes into FilterChanges::Hashes
729        FilterChanges::Hashes(pending_txs)
730    }
731}
732
733/// A structure to manage and provide access to a stream of full transaction details.
734#[derive(Debug, Clone)]
735struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
736    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
737    tx_resp_builder: TxCompat,
738}
739
740impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
741where
742    T: PoolTransaction + 'static,
743    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
744{
745    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
746    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
747        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
748    }
749
750    /// Returns all new pending transactions received since the last poll.
751    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
752        let mut pending_txs = Vec::new();
753        let mut prepared_stream = self.txs_stream.lock().await;
754
755        while let Ok(tx) = prepared_stream.try_recv() {
756            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
757                Ok(tx) => pending_txs.push(tx),
758                Err(err) => {
759                    error!(target: "rpc",
760                        %err,
761                        "Failed to fill txn with block context"
762                    );
763                }
764            }
765        }
766        FilterChanges::Transactions(pending_txs)
767    }
768}
769
770/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
771#[async_trait]
772trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
773    async fn drain(&self) -> FilterChanges<T>;
774}
775
776#[async_trait]
777impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
778    for FullTransactionsReceiver<T, TxCompat>
779where
780    T: PoolTransaction + 'static,
781    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
782{
783    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
784        Self::drain(self).await
785    }
786}
787
788/// Represents the kind of pending transaction data that can be retrieved.
789///
790/// This enum differentiates between two kinds of pending transaction data:
791/// - Just the transaction hashes.
792/// - Full transaction details.
793#[derive(Debug, Clone)]
794enum PendingTransactionKind<T> {
795    Hashes(PendingTransactionsReceiver),
796    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
797}
798
799impl<T: 'static> PendingTransactionKind<T> {
800    async fn drain(&self) -> FilterChanges<T> {
801        match self {
802            Self::Hashes(receiver) => receiver.drain().await,
803            Self::FullTransaction(receiver) => receiver.drain().await,
804        }
805    }
806}
807
808#[derive(Clone, Debug)]
809enum FilterKind<T> {
810    Log(Box<Filter>),
811    Block,
812    PendingTransaction(PendingTransactionKind<T>),
813}
814
815/// An iterator that yields _inclusive_ block ranges of a given step size
816#[derive(Debug)]
817struct BlockRangeInclusiveIter {
818    iter: StepBy<RangeInclusive<u64>>,
819    step: u64,
820    end: u64,
821}
822
823impl BlockRangeInclusiveIter {
824    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
825        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
826    }
827}
828
829impl Iterator for BlockRangeInclusiveIter {
830    type Item = (u64, u64);
831
832    fn next(&mut self) -> Option<Self::Item> {
833        let start = self.iter.next()?;
834        let end = (start + self.step).min(self.end);
835        if start > end {
836            return None
837        }
838        Some((start, end))
839    }
840}
841
842/// Errors that can occur in the handler implementation
843#[derive(Debug, thiserror::Error)]
844pub enum EthFilterError {
845    /// Filter not found.
846    #[error("filter not found")]
847    FilterNotFound(FilterId),
848    /// Invalid block range.
849    #[error("invalid block range params")]
850    InvalidBlockRangeParams,
851    /// Query scope is too broad.
852    #[error("query exceeds max block range {0}")]
853    QueryExceedsMaxBlocks(u64),
854    /// Query result is too large.
855    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
856    QueryExceedsMaxResults {
857        /// Maximum number of logs allowed per response
858        max_logs: usize,
859        /// Start block of the suggested retry range
860        from_block: u64,
861        /// End block of the suggested retry range (last successfully processed block)
862        to_block: u64,
863    },
864    /// Error serving request in `eth_` namespace.
865    #[error(transparent)]
866    EthAPIError(#[from] EthApiError),
867    /// Error thrown when a spawned task failed to deliver a response.
868    #[error("internal filter error")]
869    InternalError,
870}
871
872impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
873    fn from(err: EthFilterError) -> Self {
874        match err {
875            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
876                jsonrpsee::types::error::INVALID_PARAMS_CODE,
877                "filter not found",
878            ),
879            err @ EthFilterError::InternalError => {
880                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
881            }
882            EthFilterError::EthAPIError(err) => err.into(),
883            err @ (EthFilterError::InvalidBlockRangeParams |
884            EthFilterError::QueryExceedsMaxBlocks(_) |
885            EthFilterError::QueryExceedsMaxResults { .. }) => {
886                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
887            }
888        }
889    }
890}
891
892impl From<ProviderError> for EthFilterError {
893    fn from(err: ProviderError) -> Self {
894        Self::EthAPIError(err.into())
895    }
896}
897
898/// Helper type for the common pattern of returning receipts, block and the original header that is
899/// a match for the filter.
900struct ReceiptBlockResult<P>
901where
902    P: ReceiptProvider + BlockReader,
903{
904    /// We always need the entire receipts for the matching block.
905    receipts: Arc<Vec<ProviderReceipt<P>>>,
906    /// Block can be optional and we can fetch it lazily when needed.
907    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
908    /// The header of the block.
909    header: SealedHeader<<P as HeaderProvider>::Header>,
910}
911
912/// Represents different modes for processing block ranges when filtering logs
913enum RangeMode<
914    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
915> {
916    /// Use cache-based processing for recent blocks
917    Cached(CachedMode<Eth>),
918    /// Use range-based processing for older blocks
919    Range(RangeBlockMode<Eth>),
920}
921
922impl<
923        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
924    > RangeMode<Eth>
925{
926    /// Creates a new `RangeMode`.
927    fn new(
928        filter_inner: Arc<EthFilterInner<Eth>>,
929        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
930        from_block: u64,
931        to_block: u64,
932        max_headers_range: u64,
933        chain_tip: u64,
934    ) -> Self {
935        let block_count = to_block - from_block + 1;
936        let distance_from_tip = chain_tip.saturating_sub(to_block);
937
938        // Determine if we should use cached mode based on range characteristics
939        let use_cached_mode =
940            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
941
942        if use_cached_mode && !sealed_headers.is_empty() {
943            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
944        } else {
945            Self::Range(RangeBlockMode {
946                filter_inner,
947                iter: sealed_headers.into_iter().peekable(),
948                next: VecDeque::new(),
949                max_range: max_headers_range as usize,
950                pending_tasks: FuturesOrdered::new(),
951            })
952        }
953    }
954
955    /// Determines whether to use cached mode based on bloom filter matches and range size
956    const fn should_use_cached_mode(
957        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
958        block_count: u64,
959        distance_from_tip: u64,
960    ) -> bool {
961        // Headers are already filtered by bloom, so count equals length
962        let bloom_matches = headers.len();
963
964        // Calculate adjusted threshold based on bloom matches
965        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
966
967        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
968    }
969
970    /// Calculates the adjusted cache threshold based on bloom filter matches
971    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
972        // Only apply adjustments for larger ranges
973        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
974            return CACHED_MODE_BLOCK_THRESHOLD;
975        }
976
977        match bloom_matches {
978            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
979            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
980            _ => CACHED_MODE_BLOCK_THRESHOLD,
981        }
982    }
983
984    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
985    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
986        match self {
987            Self::Cached(cached) => cached.next().await,
988            Self::Range(range) => range.next().await,
989        }
990    }
991}
992
993/// Mode for processing blocks using cache optimization for recent blocks
994struct CachedMode<
995    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
996> {
997    filter_inner: Arc<EthFilterInner<Eth>>,
998    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
999}
1000
1001impl<
1002        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1003    > CachedMode<Eth>
1004{
1005    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1006        for header in self.headers_iter.by_ref() {
1007            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1008            if let Some((receipts, maybe_block)) =
1009                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1010            {
1011                return Ok(Some(ReceiptBlockResult {
1012                    receipts,
1013                    recovered_block: maybe_block,
1014                    header,
1015                }));
1016            }
1017        }
1018
1019        Ok(None) // No more headers
1020    }
1021}
1022
1023/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1024type ReceiptFetchFuture<P> =
1025    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1026
1027/// Mode for processing blocks using range queries for older blocks
1028struct RangeBlockMode<
1029    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1030> {
1031    filter_inner: Arc<EthFilterInner<Eth>>,
1032    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1033    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1034    max_range: usize,
1035    // Stream of ongoing receipt fetching tasks
1036    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1037}
1038
1039impl<
1040        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1041    > RangeBlockMode<Eth>
1042{
1043    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1044        loop {
1045            // First, try to return any already processed result from buffer
1046            if let Some(result) = self.next.pop_front() {
1047                return Ok(Some(result));
1048            }
1049
1050            // Try to get a completed task result if there are pending tasks
1051            if let Some(task_result) = self.pending_tasks.next().await {
1052                self.next.extend(task_result?);
1053                continue;
1054            }
1055
1056            // No pending tasks - try to generate more work
1057            let Some(next_header) = self.iter.next() else {
1058                // No more headers to process
1059                return Ok(None);
1060            };
1061
1062            let mut range_headers = Vec::with_capacity(self.max_range);
1063            range_headers.push(next_header);
1064
1065            // Collect consecutive blocks up to max_range size
1066            while range_headers.len() < self.max_range {
1067                let Some(peeked) = self.iter.peek() else { break };
1068                let Some(last_header) = range_headers.last() else { break };
1069
1070                let expected_next = last_header.number() + 1;
1071                if peeked.number() != expected_next {
1072                    debug!(
1073                        target: "rpc::eth::filter",
1074                        last_block = last_header.number(),
1075                        next_block = peeked.number(),
1076                        expected = expected_next,
1077                        range_size = range_headers.len(),
1078                        "Non-consecutive block detected, stopping range collection"
1079                    );
1080                    break; // Non-consecutive block, stop here
1081                }
1082
1083                let Some(next_header) = self.iter.next() else { break };
1084                range_headers.push(next_header);
1085            }
1086
1087            // Check if we should use parallel processing for large ranges
1088            let remaining_headers = self.iter.len() + range_headers.len();
1089            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1090                self.spawn_parallel_tasks(range_headers);
1091                // Continue loop to await the spawned tasks
1092            } else {
1093                // Process small range sequentially and add results to buffer
1094                if let Some(result) = self.process_small_range(range_headers).await? {
1095                    return Ok(Some(result));
1096                }
1097                // Continue loop to check for more work
1098            }
1099        }
1100    }
1101
1102    /// Process a small range of headers sequentially
1103    ///
1104    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1105    async fn process_small_range(
1106        &mut self,
1107        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1108    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1109        // Process each header individually to avoid queuing for all receipts
1110        for header in range_headers {
1111            // First check if already cached to avoid unnecessary provider calls
1112            let (maybe_block, maybe_receipts) = self
1113                .filter_inner
1114                .eth_cache()
1115                .maybe_cached_block_and_receipts(header.hash())
1116                .await?;
1117
1118            let receipts = match maybe_receipts {
1119                Some(receipts) => receipts,
1120                None => {
1121                    // Not cached - fetch directly from provider
1122                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1123                        Some(receipts) => Arc::new(receipts),
1124                        None => continue, // No receipts found
1125                    }
1126                }
1127            };
1128
1129            if !receipts.is_empty() {
1130                self.next.push_back(ReceiptBlockResult {
1131                    receipts,
1132                    recovered_block: maybe_block,
1133                    header,
1134                });
1135            }
1136        }
1137
1138        Ok(self.next.pop_front())
1139    }
1140
1141    /// Spawn parallel tasks for processing a large range of headers
1142    ///
1143    /// This is used when the remaining headers count is at or above
1144    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1145    fn spawn_parallel_tasks(
1146        &mut self,
1147        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1148    ) {
1149        // Split headers into chunks
1150        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1151        let header_chunks = range_headers
1152            .into_iter()
1153            .chunks(chunk_size)
1154            .into_iter()
1155            .map(|chunk| chunk.collect::<Vec<_>>())
1156            .collect::<Vec<_>>();
1157
1158        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1159        for chunk_headers in header_chunks {
1160            let filter_inner = self.filter_inner.clone();
1161            let chunk_task = Box::pin(async move {
1162                let chunk_task = tokio::task::spawn_blocking(move || {
1163                    let mut chunk_results = Vec::new();
1164
1165                    for header in chunk_headers {
1166                        // Fetch directly from provider - RangeMode is used for older blocks
1167                        // unlikely to be cached
1168                        let receipts = match filter_inner
1169                            .provider()
1170                            .receipts_by_block(header.hash().into())?
1171                        {
1172                            Some(receipts) => Arc::new(receipts),
1173                            None => continue, // No receipts found
1174                        };
1175
1176                        if !receipts.is_empty() {
1177                            chunk_results.push(ReceiptBlockResult {
1178                                receipts,
1179                                recovered_block: None,
1180                                header,
1181                            });
1182                        }
1183                    }
1184
1185                    Ok(chunk_results)
1186                });
1187
1188                // Await the blocking task and handle the result
1189                match chunk_task.await {
1190                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1191                    Ok(Err(e)) => Err(e),
1192                    Err(join_err) => {
1193                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1194                        Err(EthFilterError::InternalError)
1195                    }
1196                }
1197            });
1198
1199            self.pending_tasks.push_back(chunk_task);
1200        }
1201    }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207    use crate::{eth::EthApi, EthApiBuilder};
1208    use alloy_network::Ethereum;
1209    use alloy_primitives::FixedBytes;
1210    use rand::Rng;
1211    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1212    use reth_ethereum_primitives::TxType;
1213    use reth_evm_ethereum::EthEvmConfig;
1214    use reth_network_api::noop::NoopNetwork;
1215    use reth_provider::test_utils::MockEthProvider;
1216    use reth_rpc_convert::RpcConverter;
1217    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1218    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1219    use reth_tasks::TokioTaskExecutor;
1220    use reth_testing_utils::generators;
1221    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1222    use std::{collections::VecDeque, sync::Arc};
1223
1224    #[test]
1225    fn test_block_range_iter() {
1226        let mut rng = generators::rng();
1227
1228        let start = rng.random::<u32>() as u64;
1229        let end = start.saturating_add(rng.random::<u32>() as u64);
1230        let step = rng.random::<u16>() as u64;
1231        let range = start..=end;
1232        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1233        let (from, mut end) = iter.next().unwrap();
1234        assert_eq!(from, start);
1235        assert_eq!(end, (from + step).min(*range.end()));
1236
1237        for (next_from, next_end) in iter {
1238            // ensure range starts with previous end + 1
1239            assert_eq!(next_from, end + 1);
1240            end = next_end;
1241        }
1242
1243        assert_eq!(end, *range.end());
1244    }
1245
1246    // Helper function to create a test EthApi instance
1247    #[expect(clippy::type_complexity)]
1248    fn build_test_eth_api(
1249        provider: MockEthProvider,
1250    ) -> EthApi<
1251        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1252        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1253    > {
1254        EthApiBuilder::new(
1255            provider.clone(),
1256            testing_pool(),
1257            NoopNetwork::default(),
1258            EthEvmConfig::new(provider.chain_spec()),
1259        )
1260        .build()
1261    }
1262
1263    #[tokio::test]
1264    async fn test_range_block_mode_empty_range() {
1265        let provider = MockEthProvider::default();
1266        let eth_api = build_test_eth_api(provider);
1267
1268        let eth_filter = super::EthFilter::new(
1269            eth_api,
1270            EthFilterConfig::default(),
1271            Box::new(TokioTaskExecutor::default()),
1272        );
1273        let filter_inner = eth_filter.inner;
1274
1275        let headers = vec![];
1276        let max_range = 100;
1277
1278        let mut range_mode = RangeBlockMode {
1279            filter_inner,
1280            iter: headers.into_iter().peekable(),
1281            next: VecDeque::new(),
1282            max_range,
1283            pending_tasks: FuturesOrdered::new(),
1284        };
1285
1286        let result = range_mode.next().await;
1287        assert!(result.is_ok());
1288        assert!(result.unwrap().is_none());
1289    }
1290
1291    #[tokio::test]
1292    async fn test_range_block_mode_queued_results_priority() {
1293        let provider = MockEthProvider::default();
1294        let eth_api = build_test_eth_api(provider);
1295
1296        let eth_filter = super::EthFilter::new(
1297            eth_api,
1298            EthFilterConfig::default(),
1299            Box::new(TokioTaskExecutor::default()),
1300        );
1301        let filter_inner = eth_filter.inner;
1302
1303        let headers = vec![
1304            SealedHeader::new(
1305                alloy_consensus::Header { number: 100, ..Default::default() },
1306                FixedBytes::random(),
1307            ),
1308            SealedHeader::new(
1309                alloy_consensus::Header { number: 101, ..Default::default() },
1310                FixedBytes::random(),
1311            ),
1312        ];
1313
1314        // create specific mock results to test ordering
1315        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1316        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1317
1318        // create mock receipts to test receipt handling
1319        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1320            tx_type: TxType::Legacy,
1321            cumulative_gas_used: 100_000,
1322            logs: vec![],
1323            success: true,
1324        };
1325        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1326            tx_type: TxType::Eip1559,
1327            cumulative_gas_used: 200_000,
1328            logs: vec![],
1329            success: true,
1330        };
1331        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1332            tx_type: TxType::Eip2930,
1333            cumulative_gas_used: 150_000,
1334            logs: vec![],
1335            success: false, // Different success status
1336        };
1337
1338        let mock_result_1 = ReceiptBlockResult {
1339            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1340            recovered_block: None,
1341            header: SealedHeader::new(
1342                alloy_consensus::Header { number: 42, ..Default::default() },
1343                expected_block_hash_1,
1344            ),
1345        };
1346
1347        let mock_result_2 = ReceiptBlockResult {
1348            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1349            recovered_block: None,
1350            header: SealedHeader::new(
1351                alloy_consensus::Header { number: 43, ..Default::default() },
1352                expected_block_hash_2,
1353            ),
1354        };
1355
1356        let mut range_mode = RangeBlockMode {
1357            filter_inner,
1358            iter: headers.into_iter().peekable(),
1359            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1360            max_range: 100,
1361            pending_tasks: FuturesOrdered::new(),
1362        };
1363
1364        // first call should return the first queued result (FIFO order)
1365        let result1 = range_mode.next().await;
1366        assert!(result1.is_ok());
1367        let receipt_result1 = result1.unwrap().unwrap();
1368        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1369        assert_eq!(receipt_result1.header.number, 42);
1370
1371        // verify receipts
1372        assert_eq!(receipt_result1.receipts.len(), 2);
1373        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1374        assert_eq!(
1375            receipt_result1.receipts[0].cumulative_gas_used,
1376            mock_receipt_1.cumulative_gas_used
1377        );
1378        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1379        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1380        assert_eq!(
1381            receipt_result1.receipts[1].cumulative_gas_used,
1382            mock_receipt_2.cumulative_gas_used
1383        );
1384        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1385
1386        // second call should return the second queued result
1387        let result2 = range_mode.next().await;
1388        assert!(result2.is_ok());
1389        let receipt_result2 = result2.unwrap().unwrap();
1390        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1391        assert_eq!(receipt_result2.header.number, 43);
1392
1393        // verify receipts
1394        assert_eq!(receipt_result2.receipts.len(), 1);
1395        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1396        assert_eq!(
1397            receipt_result2.receipts[0].cumulative_gas_used,
1398            mock_receipt_3.cumulative_gas_used
1399        );
1400        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1401
1402        // queue should now be empty
1403        assert!(range_mode.next.is_empty());
1404
1405        let result3 = range_mode.next().await;
1406        assert!(result3.is_ok());
1407    }
1408
1409    #[tokio::test]
1410    async fn test_range_block_mode_single_block_no_receipts() {
1411        let provider = MockEthProvider::default();
1412        let eth_api = build_test_eth_api(provider);
1413
1414        let eth_filter = super::EthFilter::new(
1415            eth_api,
1416            EthFilterConfig::default(),
1417            Box::new(TokioTaskExecutor::default()),
1418        );
1419        let filter_inner = eth_filter.inner;
1420
1421        let headers = vec![SealedHeader::new(
1422            alloy_consensus::Header { number: 100, ..Default::default() },
1423            FixedBytes::random(),
1424        )];
1425
1426        let mut range_mode = RangeBlockMode {
1427            filter_inner,
1428            iter: headers.into_iter().peekable(),
1429            next: VecDeque::new(),
1430            max_range: 100,
1431            pending_tasks: FuturesOrdered::new(),
1432        };
1433
1434        let result = range_mode.next().await;
1435        assert!(result.is_ok());
1436    }
1437
1438    #[tokio::test]
1439    async fn test_range_block_mode_provider_receipts() {
1440        let provider = MockEthProvider::default();
1441
1442        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1443        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1444        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1445
1446        let block_hash_1 = FixedBytes::random();
1447        let block_hash_2 = FixedBytes::random();
1448        let block_hash_3 = FixedBytes::random();
1449
1450        provider.add_header(block_hash_1, header_1.clone());
1451        provider.add_header(block_hash_2, header_2.clone());
1452        provider.add_header(block_hash_3, header_3.clone());
1453
1454        // create mock receipts to test provider fetching with mock logs
1455        let mock_log = alloy_primitives::Log {
1456            address: alloy_primitives::Address::ZERO,
1457            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1458        };
1459
1460        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1461            tx_type: TxType::Legacy,
1462            cumulative_gas_used: 21_000,
1463            logs: vec![mock_log.clone()],
1464            success: true,
1465        };
1466        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1467            tx_type: TxType::Eip1559,
1468            cumulative_gas_used: 42_000,
1469            logs: vec![mock_log.clone()],
1470            success: true,
1471        };
1472        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1473            tx_type: TxType::Eip2930,
1474            cumulative_gas_used: 30_000,
1475            logs: vec![mock_log.clone()],
1476            success: false,
1477        };
1478
1479        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1480        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1481
1482        let eth_api = build_test_eth_api(provider);
1483
1484        let eth_filter = super::EthFilter::new(
1485            eth_api,
1486            EthFilterConfig::default(),
1487            Box::new(TokioTaskExecutor::default()),
1488        );
1489        let filter_inner = eth_filter.inner;
1490
1491        let headers = vec![
1492            SealedHeader::new(header_1, block_hash_1),
1493            SealedHeader::new(header_2, block_hash_2),
1494            SealedHeader::new(header_3, block_hash_3),
1495        ];
1496
1497        let mut range_mode = RangeBlockMode {
1498            filter_inner,
1499            iter: headers.into_iter().peekable(),
1500            next: VecDeque::new(),
1501            max_range: 3, // include the 3 blocks in the first queried results
1502            pending_tasks: FuturesOrdered::new(),
1503        };
1504
1505        // first call should fetch receipts from provider and return first block with receipts
1506        let result = range_mode.next().await;
1507        assert!(result.is_ok());
1508        let receipt_result = result.unwrap().unwrap();
1509
1510        assert_eq!(receipt_result.header.hash(), block_hash_1);
1511        assert_eq!(receipt_result.header.number, 100);
1512        assert_eq!(receipt_result.receipts.len(), 2);
1513
1514        // verify receipts
1515        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1516        assert_eq!(
1517            receipt_result.receipts[0].cumulative_gas_used,
1518            receipt_100_1.cumulative_gas_used
1519        );
1520        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1521
1522        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1523        assert_eq!(
1524            receipt_result.receipts[1].cumulative_gas_used,
1525            receipt_100_2.cumulative_gas_used
1526        );
1527        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1528
1529        // second call should return the second block with receipts
1530        let result2 = range_mode.next().await;
1531        assert!(result2.is_ok());
1532        let receipt_result2 = result2.unwrap().unwrap();
1533
1534        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1535        assert_eq!(receipt_result2.header.number, 101);
1536        assert_eq!(receipt_result2.receipts.len(), 1);
1537
1538        // verify receipts
1539        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1540        assert_eq!(
1541            receipt_result2.receipts[0].cumulative_gas_used,
1542            receipt_101_1.cumulative_gas_used
1543        );
1544        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1545
1546        // third call should return None since no more blocks with receipts
1547        let result3 = range_mode.next().await;
1548        assert!(result3.is_ok());
1549        assert!(result3.unwrap().is_none());
1550    }
1551
1552    #[tokio::test]
1553    async fn test_range_block_mode_iterator_exhaustion() {
1554        let provider = MockEthProvider::default();
1555
1556        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1557        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1558
1559        let block_hash_100 = FixedBytes::random();
1560        let block_hash_101 = FixedBytes::random();
1561
1562        // Associate headers with hashes first
1563        provider.add_header(block_hash_100, header_100.clone());
1564        provider.add_header(block_hash_101, header_101.clone());
1565
1566        // Add mock receipts so headers are actually processed
1567        let mock_receipt = reth_ethereum_primitives::Receipt {
1568            tx_type: TxType::Legacy,
1569            cumulative_gas_used: 21_000,
1570            logs: vec![],
1571            success: true,
1572        };
1573        provider.add_receipts(100, vec![mock_receipt.clone()]);
1574        provider.add_receipts(101, vec![mock_receipt.clone()]);
1575
1576        let eth_api = build_test_eth_api(provider);
1577
1578        let eth_filter = super::EthFilter::new(
1579            eth_api,
1580            EthFilterConfig::default(),
1581            Box::new(TokioTaskExecutor::default()),
1582        );
1583        let filter_inner = eth_filter.inner;
1584
1585        let headers = vec![
1586            SealedHeader::new(header_100, block_hash_100),
1587            SealedHeader::new(header_101, block_hash_101),
1588        ];
1589
1590        let mut range_mode = RangeBlockMode {
1591            filter_inner,
1592            iter: headers.into_iter().peekable(),
1593            next: VecDeque::new(),
1594            max_range: 1,
1595            pending_tasks: FuturesOrdered::new(),
1596        };
1597
1598        let result1 = range_mode.next().await;
1599        assert!(result1.is_ok());
1600        assert!(result1.unwrap().is_some()); // Should have processed block 100
1601
1602        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1603
1604        let result2 = range_mode.next().await;
1605        assert!(result2.is_ok());
1606        assert!(result2.unwrap().is_some()); // Should have processed block 101
1607
1608        // now iterator should be exhausted
1609        assert!(range_mode.iter.peek().is_none());
1610
1611        // further calls should return None
1612        let result3 = range_mode.next().await;
1613        assert!(result3.is_ok());
1614        assert!(result3.unwrap().is_none());
1615    }
1616
1617    #[tokio::test]
1618    async fn test_cached_mode_with_mock_receipts() {
1619        // create test data
1620        let test_hash = FixedBytes::from([42u8; 32]);
1621        let test_block_number = 100u64;
1622        let test_header = SealedHeader::new(
1623            alloy_consensus::Header {
1624                number: test_block_number,
1625                gas_used: 50_000,
1626                ..Default::default()
1627            },
1628            test_hash,
1629        );
1630
1631        // add a mock receipt to the provider with a mock log
1632        let mock_log = alloy_primitives::Log {
1633            address: alloy_primitives::Address::ZERO,
1634            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1635        };
1636
1637        let mock_receipt = reth_ethereum_primitives::Receipt {
1638            tx_type: TxType::Legacy,
1639            cumulative_gas_used: 21_000,
1640            logs: vec![mock_log],
1641            success: true,
1642        };
1643
1644        let provider = MockEthProvider::default();
1645        provider.add_header(test_hash, test_header.header().clone());
1646        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1647
1648        let eth_api = build_test_eth_api(provider);
1649        let eth_filter = super::EthFilter::new(
1650            eth_api,
1651            EthFilterConfig::default(),
1652            Box::new(TokioTaskExecutor::default()),
1653        );
1654        let filter_inner = eth_filter.inner;
1655
1656        let headers = vec![test_header.clone()];
1657
1658        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1659
1660        // should find the receipt from provider fallback (cache will be empty)
1661        let result = cached_mode.next().await.expect("next should succeed");
1662        let receipt_block_result = result.expect("should have receipt result");
1663        assert_eq!(receipt_block_result.header.hash(), test_hash);
1664        assert_eq!(receipt_block_result.header.number, test_block_number);
1665        assert_eq!(receipt_block_result.receipts.len(), 1);
1666        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1667        assert_eq!(
1668            receipt_block_result.receipts[0].cumulative_gas_used,
1669            mock_receipt.cumulative_gas_used
1670        );
1671        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1672
1673        // iterator should be exhausted
1674        let result2 = cached_mode.next().await;
1675        assert!(result2.is_ok());
1676        assert!(result2.unwrap().is_none());
1677    }
1678
1679    #[tokio::test]
1680    async fn test_cached_mode_empty_headers() {
1681        let provider = MockEthProvider::default();
1682        let eth_api = build_test_eth_api(provider);
1683
1684        let eth_filter = super::EthFilter::new(
1685            eth_api,
1686            EthFilterConfig::default(),
1687            Box::new(TokioTaskExecutor::default()),
1688        );
1689        let filter_inner = eth_filter.inner;
1690
1691        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1692
1693        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1694
1695        // should immediately return None for empty headers
1696        let result = cached_mode.next().await.expect("next should succeed");
1697        assert!(result.is_none());
1698    }
1699
1700    #[tokio::test]
1701    async fn test_non_consecutive_headers_after_bloom_filter() {
1702        let provider = MockEthProvider::default();
1703
1704        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1705        let mut expected_hashes = vec![];
1706        let mut prev_hash = alloy_primitives::B256::default();
1707
1708        // Create a transaction for blocks that will have receipts
1709        use alloy_consensus::TxLegacy;
1710        use reth_ethereum_primitives::{TransactionSigned, TxType};
1711
1712        let tx_inner = TxLegacy {
1713            chain_id: Some(1),
1714            nonce: 0,
1715            gas_price: 21_000,
1716            gas_limit: 21_000,
1717            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1718            value: alloy_primitives::U256::ZERO,
1719            input: alloy_primitives::Bytes::new(),
1720        };
1721        let signature = alloy_primitives::Signature::test_signature();
1722        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1723
1724        for i in 100u64..=103 {
1725            let header = alloy_consensus::Header {
1726                number: i,
1727                parent_hash: prev_hash,
1728                // Set bloom to match filter only for blocks 100 and 102
1729                logs_bloom: if i == 100 || i == 102 {
1730                    alloy_primitives::Bloom::from([1u8; 256])
1731                } else {
1732                    alloy_primitives::Bloom::default()
1733                },
1734                ..Default::default()
1735            };
1736
1737            let hash = header.hash_slow();
1738            expected_hashes.push(hash);
1739            prev_hash = hash;
1740
1741            // Add transaction to blocks that will have receipts (100 and 102)
1742            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1743
1744            let block = reth_ethereum_primitives::Block {
1745                header,
1746                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1747            };
1748            provider.add_block(hash, block);
1749        }
1750
1751        // Add receipts with logs only to blocks that match bloom
1752        let mock_log = alloy_primitives::Log {
1753            address: alloy_primitives::Address::ZERO,
1754            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1755        };
1756
1757        let receipt = reth_ethereum_primitives::Receipt {
1758            tx_type: TxType::Legacy,
1759            cumulative_gas_used: 21_000,
1760            logs: vec![mock_log],
1761            success: true,
1762        };
1763
1764        provider.add_receipts(100, vec![receipt.clone()]);
1765        provider.add_receipts(101, vec![]);
1766        provider.add_receipts(102, vec![receipt.clone()]);
1767        provider.add_receipts(103, vec![]);
1768
1769        // Add block body indices for each block so receipts can be fetched
1770        use reth_db_api::models::StoredBlockBodyIndices;
1771        provider
1772            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1773        provider
1774            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1775        provider
1776            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1777        provider
1778            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1779
1780        let eth_api = build_test_eth_api(provider);
1781        let eth_filter = EthFilter::new(
1782            eth_api,
1783            EthFilterConfig::default(),
1784            Box::new(TokioTaskExecutor::default()),
1785        );
1786
1787        // Use default filter which will match any non-empty bloom
1788        let filter = Filter::default();
1789
1790        // Get logs in the range - this will trigger the bloom filtering
1791        let logs = eth_filter
1792            .inner
1793            .clone()
1794            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1795            .await
1796            .expect("should succeed");
1797
1798        // We should get logs from blocks 100 and 102 only (bloom filtered)
1799        assert_eq!(logs.len(), 2);
1800
1801        assert_eq!(logs[0].block_number, Some(100));
1802        assert_eq!(logs[1].block_number, Some(102));
1803
1804        // Each block hash should be the hash of its own header, not derived from any other header
1805        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1806        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1807    }
1808}