reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{Sealable, TxHash};
5use alloy_rpc_types_eth::{
6    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7    PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::{
11    future::TryFutureExt,
12    stream::{FuturesOrdered, StreamExt},
13    Future,
14};
15use itertools::Itertools;
16use jsonrpsee::{core::RpcResult, server::IdProvider};
17use reth_errors::ProviderError;
18use reth_primitives_traits::{NodePrimitives, SealedHeader};
19use reth_rpc_eth_api::{
20    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
21    RpcNodeCoreExt, RpcTransaction,
22};
23use reth_rpc_eth_types::{
24    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
25    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
26};
27use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
28use reth_storage_api::{
29    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
30    ProviderReceipt, ReceiptProvider,
31};
32use reth_tasks::TaskSpawner;
33use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
34use std::{
35    collections::{HashMap, VecDeque},
36    fmt,
37    iter::{Peekable, StepBy},
38    ops::RangeInclusive,
39    pin::Pin,
40    sync::Arc,
41    time::{Duration, Instant},
42};
43use tokio::{
44    sync::{mpsc::Receiver, oneshot, Mutex},
45    time::MissedTickBehavior,
46};
47use tracing::{debug, error, trace};
48
49impl<Eth> EngineEthFilter for EthFilter<Eth>
50where
51    Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
52{
53    /// Returns logs matching given filter object, no query limits
54    fn logs(
55        &self,
56        filter: Filter,
57        limits: QueryLimits,
58    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
59        trace!(target: "rpc::eth", "Serving eth_getLogs");
60        self.logs_for_filter(filter, limits).map_err(|e| e.into())
61    }
62}
63
64/// Threshold for deciding between cached and range mode processing
65const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
66
67/// Threshold for bloom filter matches that triggers reduced caching
68const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
69
70/// Threshold for bloom filter matches that triggers moderately reduced caching
71const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
72
73/// Minimum block count to apply bloom filter match adjustments
74const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
75
76/// The maximum number of headers we read at once when handling a range filter.
77const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
78
79/// Threshold for enabling parallel processing in range mode
80const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
81
82/// Default concurrency for parallel processing
83const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
84
85/// `Eth` filter RPC implementation.
86///
87/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
88pub struct EthFilter<Eth: EthApiTypes> {
89    /// All nested fields bundled together
90    inner: Arc<EthFilterInner<Eth>>,
91}
92
93impl<Eth> Clone for EthFilter<Eth>
94where
95    Eth: EthApiTypes,
96{
97    fn clone(&self) -> Self {
98        Self { inner: self.inner.clone() }
99    }
100}
101
102impl<Eth> EthFilter<Eth>
103where
104    Eth: EthApiTypes + 'static,
105{
106    /// Creates a new, shareable instance.
107    ///
108    /// This uses the given pool to get notified about new transactions, the provider to interact
109    /// with the blockchain, the cache to fetch cacheable data, like the logs.
110    ///
111    /// See also [`EthFilterConfig`].
112    ///
113    /// This also spawns a task that periodically clears stale filters.
114    ///
115    /// # Create a new instance with [`EthApi`](crate::EthApi)
116    ///
117    /// ```no_run
118    /// use reth_evm_ethereum::EthEvmConfig;
119    /// use reth_network_api::noop::NoopNetwork;
120    /// use reth_provider::noop::NoopProvider;
121    /// use reth_rpc::{EthApi, EthFilter};
122    /// use reth_tasks::TokioTaskExecutor;
123    /// use reth_transaction_pool::noop::NoopTransactionPool;
124    /// let eth_api = EthApi::builder(
125    ///     NoopProvider::default(),
126    ///     NoopTransactionPool::default(),
127    ///     NoopNetwork::default(),
128    ///     EthEvmConfig::mainnet(),
129    /// )
130    /// .build();
131    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
132    /// ```
133    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
134        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
135            config;
136        let inner = EthFilterInner {
137            eth_api,
138            active_filters: ActiveFilters::new(),
139            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
140            max_headers_range: MAX_HEADERS_RANGE,
141            task_spawner,
142            stale_filter_ttl,
143            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
144        };
145
146        let eth_filter = Self { inner: Arc::new(inner) };
147
148        let this = eth_filter.clone();
149        eth_filter.inner.task_spawner.spawn_critical(
150            "eth-filters_stale-filters-clean",
151            Box::pin(async move {
152                this.watch_and_clear_stale_filters().await;
153            }),
154        );
155
156        eth_filter
157    }
158
159    /// Returns all currently active filters
160    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
161        &self.inner.active_filters
162    }
163
164    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
165    /// Nonetheless, this endless future frees the thread at every await point.
166    async fn watch_and_clear_stale_filters(&self) {
167        let mut interval = tokio::time::interval_at(
168            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
169            self.inner.stale_filter_ttl,
170        );
171        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
172        loop {
173            interval.tick().await;
174            self.clear_stale_filters(Instant::now()).await;
175        }
176    }
177
178    /// Clears all filters that have not been polled for longer than the configured
179    /// `stale_filter_ttl` at the given instant.
180    pub async fn clear_stale_filters(&self, now: Instant) {
181        trace!(target: "rpc::eth", "clear stale filters");
182        self.active_filters().inner.lock().await.retain(|id, filter| {
183            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
184
185            if !is_valid {
186                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
187            }
188
189            is_valid
190        })
191    }
192}
193
194impl<Eth> EthFilter<Eth>
195where
196    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
197{
198    /// Access the underlying provider.
199    fn provider(&self) -> &Eth::Provider {
200        self.inner.eth_api.provider()
201    }
202
203    /// Access the underlying pool.
204    fn pool(&self) -> &Eth::Pool {
205        self.inner.eth_api.pool()
206    }
207
208    /// Returns all the filter changes for the given id, if any
209    pub async fn filter_changes(
210        &self,
211        id: FilterId,
212    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
213        let info = self.provider().chain_info()?;
214        let best_number = info.best_number;
215
216        // start_block is the block from which we should start fetching changes, the next block from
217        // the last time changes were polled, in other words the best block at last poll + 1
218        let (start_block, kind) = {
219            let mut filters = self.inner.active_filters.inner.lock().await;
220            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
221
222            if filter.block > best_number {
223                // no new blocks since the last poll
224                return Ok(FilterChanges::Empty)
225            }
226
227            // update filter
228            // we fetch all changes from [filter.block..best_block], so we advance the filter's
229            // block to `best_block +1`, the next from which we should start fetching changes again
230            let mut block = best_number + 1;
231            std::mem::swap(&mut filter.block, &mut block);
232            filter.last_poll_timestamp = Instant::now();
233
234            (block, filter.kind.clone())
235        };
236
237        match kind {
238            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
239            FilterKind::Block => {
240                // Note: we need to fetch the block hashes from inclusive range
241                // [start_block..best_block]
242                let end_block = best_number + 1;
243                let block_hashes =
244                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
245                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
246                    )?;
247                Ok(FilterChanges::Hashes(block_hashes))
248            }
249            FilterKind::Log(filter) => {
250                let (from_block_number, to_block_number) = match filter.block_option {
251                    FilterBlockOption::Range { from_block, to_block } => {
252                        let from = from_block
253                            .map(|num| self.provider().convert_block_number(num))
254                            .transpose()?
255                            .flatten();
256                        let to = to_block
257                            .map(|num| self.provider().convert_block_number(num))
258                            .transpose()?
259                            .flatten();
260                        logs_utils::get_filter_block_range(from, to, start_block, info)
261                    }
262                    FilterBlockOption::AtBlockHash(_) => {
263                        // blockHash is equivalent to fromBlock = toBlock = the block number with
264                        // hash blockHash
265                        // get_logs_in_block_range is inclusive
266                        (start_block, best_number)
267                    }
268                };
269                let logs = self
270                    .inner
271                    .clone()
272                    .get_logs_in_block_range(
273                        *filter,
274                        from_block_number,
275                        to_block_number,
276                        self.inner.query_limits,
277                    )
278                    .await?;
279                Ok(FilterChanges::Logs(logs))
280            }
281        }
282    }
283
284    /// Returns an array of all logs matching filter with given id.
285    ///
286    /// Returns an error if no matching log filter exists.
287    ///
288    /// Handler for `eth_getFilterLogs`
289    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
290        let filter = {
291            let filters = self.inner.active_filters.inner.lock().await;
292            if let FilterKind::Log(ref filter) =
293                filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
294            {
295                *filter.clone()
296            } else {
297                // Not a log filter
298                return Err(EthFilterError::FilterNotFound(id))
299            }
300        };
301
302        self.logs_for_filter(filter, self.inner.query_limits).await
303    }
304
305    /// Returns logs matching given filter object.
306    async fn logs_for_filter(
307        &self,
308        filter: Filter,
309        limits: QueryLimits,
310    ) -> Result<Vec<Log>, EthFilterError> {
311        self.inner.clone().logs_for_filter(filter, limits).await
312    }
313}
314
315#[async_trait]
316impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
317where
318    Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
319{
320    /// Handler for `eth_newFilter`
321    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
322        trace!(target: "rpc::eth", "Serving eth_newFilter");
323        self.inner
324            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
325            .await
326    }
327
328    /// Handler for `eth_newBlockFilter`
329    async fn new_block_filter(&self) -> RpcResult<FilterId> {
330        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
331        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
332    }
333
334    /// Handler for `eth_newPendingTransactionFilter`
335    async fn new_pending_transaction_filter(
336        &self,
337        kind: Option<PendingTransactionFilterKind>,
338    ) -> RpcResult<FilterId> {
339        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
340
341        let transaction_kind = match kind.unwrap_or_default() {
342            PendingTransactionFilterKind::Hashes => {
343                let receiver = self.pool().pending_transactions_listener();
344                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
345                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
346            }
347            PendingTransactionFilterKind::Full => {
348                let stream = self.pool().new_pending_pool_transactions_listener();
349                let full_txs_receiver = FullTransactionsReceiver::new(
350                    stream,
351                    self.inner.eth_api.tx_resp_builder().clone(),
352                );
353                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
354                    full_txs_receiver,
355                )))
356            }
357        };
358
359        //let filter = FilterKind::PendingTransaction(transaction_kind);
360
361        // Install the filter and propagate any errors
362        self.inner.install_filter(transaction_kind).await
363    }
364
365    /// Handler for `eth_getFilterChanges`
366    async fn filter_changes(
367        &self,
368        id: FilterId,
369    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
370        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
371        Ok(Self::filter_changes(self, id).await?)
372    }
373
374    /// Returns an array of all logs matching filter with given id.
375    ///
376    /// Returns an error if no matching log filter exists.
377    ///
378    /// Handler for `eth_getFilterLogs`
379    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
380        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
381        Ok(Self::filter_logs(self, id).await?)
382    }
383
384    /// Handler for `eth_uninstallFilter`
385    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
386        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
387        let mut filters = self.inner.active_filters.inner.lock().await;
388        if filters.remove(&id).is_some() {
389            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
390            Ok(true)
391        } else {
392            Ok(false)
393        }
394    }
395
396    /// Returns logs matching given filter object.
397    ///
398    /// Handler for `eth_getLogs`
399    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
400        trace!(target: "rpc::eth", "Serving eth_getLogs");
401        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
402    }
403}
404
405impl<Eth> std::fmt::Debug for EthFilter<Eth>
406where
407    Eth: EthApiTypes,
408{
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        f.debug_struct("EthFilter").finish_non_exhaustive()
411    }
412}
413
414/// Container type `EthFilter`
415#[derive(Debug)]
416struct EthFilterInner<Eth: EthApiTypes> {
417    /// Inner `eth` API implementation.
418    eth_api: Eth,
419    /// All currently installed filters.
420    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
421    /// Provides ids to identify filters
422    id_provider: Arc<dyn IdProvider>,
423    /// limits for logs queries
424    query_limits: QueryLimits,
425    /// maximum number of headers to read at once for range filter
426    max_headers_range: u64,
427    /// The type that can spawn tasks.
428    task_spawner: Box<dyn TaskSpawner>,
429    /// Duration since the last filter poll, after which the filter is considered stale
430    stale_filter_ttl: Duration,
431}
432
433impl<Eth> EthFilterInner<Eth>
434where
435    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
436        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
437        + 'static,
438{
439    /// Access the underlying provider.
440    fn provider(&self) -> &Eth::Provider {
441        self.eth_api.provider()
442    }
443
444    /// Access the underlying [`EthStateCache`].
445    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
446        self.eth_api.cache()
447    }
448
449    /// Returns logs matching given filter object.
450    async fn logs_for_filter(
451        self: Arc<Self>,
452        filter: Filter,
453        limits: QueryLimits,
454    ) -> Result<Vec<Log>, EthFilterError> {
455        match filter.block_option {
456            FilterBlockOption::AtBlockHash(block_hash) => {
457                // for all matching logs in the block
458                // get the block header with the hash
459                let header = self
460                    .provider()
461                    .header_by_hash_or_number(block_hash.into())?
462                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
463
464                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
465
466                // we also need to ensure that the receipts are available and return an error if
467                // not, in case the block hash been reorged
468                let (receipts, maybe_block) = self
469                    .eth_cache()
470                    .get_receipts_and_maybe_block(block_num_hash.hash)
471                    .await?
472                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
473
474                let mut all_logs = Vec::new();
475                append_matching_block_logs(
476                    &mut all_logs,
477                    maybe_block
478                        .map(ProviderOrBlock::Block)
479                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
480                    &filter,
481                    block_num_hash,
482                    &receipts,
483                    false,
484                    header.timestamp(),
485                )?;
486
487                Ok(all_logs)
488            }
489            FilterBlockOption::Range { from_block, to_block } => {
490                // compute the range
491                let info = self.provider().chain_info()?;
492
493                // we start at the most recent block if unset in filter
494                let start_block = info.best_number;
495                let from = from_block
496                    .map(|num| self.provider().convert_block_number(num))
497                    .transpose()?
498                    .flatten();
499                let to = to_block
500                    .map(|num| self.provider().convert_block_number(num))
501                    .transpose()?
502                    .flatten();
503                let (from_block_number, to_block_number) =
504                    logs_utils::get_filter_block_range(from, to, start_block, info);
505                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
506                    .await
507            }
508        }
509    }
510
511    /// Installs a new filter and returns the new identifier.
512    async fn install_filter(
513        &self,
514        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
515    ) -> RpcResult<FilterId> {
516        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
517        let subscription_id = self.id_provider.next_id();
518
519        let id = match subscription_id {
520            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
521            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
522        };
523        let mut filters = self.active_filters.inner.lock().await;
524        filters.insert(
525            id.clone(),
526            ActiveFilter {
527                block: last_poll_block_number,
528                last_poll_timestamp: Instant::now(),
529                kind,
530            },
531        );
532        Ok(id)
533    }
534
535    /// Returns all logs in the given _inclusive_ range that match the filter
536    ///
537    /// Returns an error if:
538    ///  - underlying database error
539    ///  - amount of matches exceeds configured limit
540    async fn get_logs_in_block_range(
541        self: Arc<Self>,
542        filter: Filter,
543        from_block: u64,
544        to_block: u64,
545        limits: QueryLimits,
546    ) -> Result<Vec<Log>, EthFilterError> {
547        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
548
549        // perform boundary checks first
550        if to_block < from_block {
551            return Err(EthFilterError::InvalidBlockRangeParams)
552        }
553
554        if let Some(max_blocks_per_filter) =
555            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
556        {
557            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
558        }
559
560        let (tx, rx) = oneshot::channel();
561        let this = self.clone();
562        self.task_spawner.spawn_blocking(Box::pin(async move {
563            let res =
564                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
565            let _ = tx.send(res);
566        }));
567
568        rx.await.map_err(|_| EthFilterError::InternalError)?
569    }
570
571    /// Returns all logs in the given _inclusive_ range that match the filter
572    ///
573    /// Note: This function uses a mix of blocking db operations for fetching indices and header
574    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
575    /// This function is considered blocking and should thus be spawned on a blocking task.
576    ///
577    /// Returns an error if:
578    ///  - underlying database error
579    async fn get_logs_in_block_range_inner(
580        self: Arc<Self>,
581        filter: &Filter,
582        from_block: u64,
583        to_block: u64,
584        limits: QueryLimits,
585    ) -> Result<Vec<Log>, EthFilterError> {
586        let mut all_logs = Vec::new();
587        let mut matching_headers = Vec::new();
588
589        // get current chain tip to determine processing mode
590        let chain_tip = self.provider().best_block_number()?;
591
592        // first collect all headers that match the bloom filter for cached mode decision
593        for (from, to) in
594            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
595        {
596            let headers = self.provider().headers_range(from..=to)?;
597
598            let mut headers_iter = headers.into_iter().peekable();
599
600            while let Some(header) = headers_iter.next() {
601                if !filter.matches_bloom(header.logs_bloom()) {
602                    continue
603                }
604
605                let current_number = header.number();
606
607                let block_hash = match headers_iter.peek() {
608                    Some(next_header) if next_header.number() == current_number + 1 => {
609                        // Headers are consecutive, use the more efficient parent_hash
610                        next_header.parent_hash()
611                    }
612                    _ => {
613                        // Headers not consecutive or last header, calculate hash
614                        header.hash_slow()
615                    }
616                };
617
618                matching_headers.push(SealedHeader::new(header, block_hash));
619            }
620        }
621
622        // initialize the appropriate range mode based on collected headers
623        let mut range_mode = RangeMode::new(
624            self.clone(),
625            matching_headers,
626            from_block,
627            to_block,
628            self.max_headers_range,
629            chain_tip,
630        );
631
632        // iterate through the range mode to get receipts and blocks
633        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
634            range_mode.next().await?
635        {
636            let num_hash = header.num_hash();
637            append_matching_block_logs(
638                &mut all_logs,
639                recovered_block
640                    .map(ProviderOrBlock::Block)
641                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
642                filter,
643                num_hash,
644                &receipts,
645                false,
646                header.timestamp(),
647            )?;
648
649            // size check but only if range is multiple blocks, so we always return all
650            // logs of a single block
651            let is_multi_block_range = from_block != to_block;
652            if let Some(max_logs_per_response) = limits.max_logs_per_response {
653                if is_multi_block_range && all_logs.len() > max_logs_per_response {
654                    debug!(
655                        target: "rpc::eth::filter",
656                        logs_found = all_logs.len(),
657                        max_logs_per_response,
658                        from_block,
659                        to_block = num_hash.number.saturating_sub(1),
660                        "Query exceeded max logs per response limit"
661                    );
662                    return Err(EthFilterError::QueryExceedsMaxResults {
663                        max_logs: max_logs_per_response,
664                        from_block,
665                        to_block: num_hash.number.saturating_sub(1),
666                    });
667                }
668            }
669        }
670
671        Ok(all_logs)
672    }
673}
674
675/// All active filters
676#[derive(Debug, Clone, Default)]
677pub struct ActiveFilters<T> {
678    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
679}
680
681impl<T> ActiveFilters<T> {
682    /// Returns an empty instance.
683    pub fn new() -> Self {
684        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
685    }
686}
687
688/// An installed filter
689#[derive(Debug)]
690struct ActiveFilter<T> {
691    /// At which block the filter was polled last.
692    block: u64,
693    /// Last time this filter was polled.
694    last_poll_timestamp: Instant,
695    /// What kind of filter it is.
696    kind: FilterKind<T>,
697}
698
699/// A receiver for pending transactions that returns all new transactions since the last poll.
700#[derive(Debug, Clone)]
701struct PendingTransactionsReceiver {
702    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
703}
704
705impl PendingTransactionsReceiver {
706    fn new(receiver: Receiver<TxHash>) -> Self {
707        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
708    }
709
710    /// Returns all new pending transactions received since the last poll.
711    async fn drain<T>(&self) -> FilterChanges<T> {
712        let mut pending_txs = Vec::new();
713        let mut prepared_stream = self.txs_receiver.lock().await;
714
715        while let Ok(tx_hash) = prepared_stream.try_recv() {
716            pending_txs.push(tx_hash);
717        }
718
719        // Convert the vector of hashes into FilterChanges::Hashes
720        FilterChanges::Hashes(pending_txs)
721    }
722}
723
724/// A structure to manage and provide access to a stream of full transaction details.
725#[derive(Debug, Clone)]
726struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
727    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
728    tx_resp_builder: TxCompat,
729}
730
731impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
732where
733    T: PoolTransaction + 'static,
734    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
735{
736    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
737    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
738        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
739    }
740
741    /// Returns all new pending transactions received since the last poll.
742    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
743        let mut pending_txs = Vec::new();
744        let mut prepared_stream = self.txs_stream.lock().await;
745
746        while let Ok(tx) = prepared_stream.try_recv() {
747            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
748                Ok(tx) => pending_txs.push(tx),
749                Err(err) => {
750                    error!(target: "rpc",
751                        %err,
752                        "Failed to fill txn with block context"
753                    );
754                }
755            }
756        }
757        FilterChanges::Transactions(pending_txs)
758    }
759}
760
761/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
762#[async_trait]
763trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
764    async fn drain(&self) -> FilterChanges<T>;
765}
766
767#[async_trait]
768impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
769    for FullTransactionsReceiver<T, TxCompat>
770where
771    T: PoolTransaction + 'static,
772    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
773{
774    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
775        Self::drain(self).await
776    }
777}
778
779/// Represents the kind of pending transaction data that can be retrieved.
780///
781/// This enum differentiates between two kinds of pending transaction data:
782/// - Just the transaction hashes.
783/// - Full transaction details.
784#[derive(Debug, Clone)]
785enum PendingTransactionKind<T> {
786    Hashes(PendingTransactionsReceiver),
787    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
788}
789
790impl<T: 'static> PendingTransactionKind<T> {
791    async fn drain(&self) -> FilterChanges<T> {
792        match self {
793            Self::Hashes(receiver) => receiver.drain().await,
794            Self::FullTransaction(receiver) => receiver.drain().await,
795        }
796    }
797}
798
799#[derive(Clone, Debug)]
800enum FilterKind<T> {
801    Log(Box<Filter>),
802    Block,
803    PendingTransaction(PendingTransactionKind<T>),
804}
805
806/// An iterator that yields _inclusive_ block ranges of a given step size
807#[derive(Debug)]
808struct BlockRangeInclusiveIter {
809    iter: StepBy<RangeInclusive<u64>>,
810    step: u64,
811    end: u64,
812}
813
814impl BlockRangeInclusiveIter {
815    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
816        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
817    }
818}
819
820impl Iterator for BlockRangeInclusiveIter {
821    type Item = (u64, u64);
822
823    fn next(&mut self) -> Option<Self::Item> {
824        let start = self.iter.next()?;
825        let end = (start + self.step).min(self.end);
826        if start > end {
827            return None
828        }
829        Some((start, end))
830    }
831}
832
833/// Errors that can occur in the handler implementation
834#[derive(Debug, thiserror::Error)]
835pub enum EthFilterError {
836    /// Filter not found.
837    #[error("filter not found")]
838    FilterNotFound(FilterId),
839    /// Invalid block range.
840    #[error("invalid block range params")]
841    InvalidBlockRangeParams,
842    /// Query scope is too broad.
843    #[error("query exceeds max block range {0}")]
844    QueryExceedsMaxBlocks(u64),
845    /// Query result is too large.
846    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
847    QueryExceedsMaxResults {
848        /// Maximum number of logs allowed per response
849        max_logs: usize,
850        /// Start block of the suggested retry range
851        from_block: u64,
852        /// End block of the suggested retry range (last successfully processed block)
853        to_block: u64,
854    },
855    /// Error serving request in `eth_` namespace.
856    #[error(transparent)]
857    EthAPIError(#[from] EthApiError),
858    /// Error thrown when a spawned task failed to deliver a response.
859    #[error("internal filter error")]
860    InternalError,
861}
862
863impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
864    fn from(err: EthFilterError) -> Self {
865        match err {
866            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
867                jsonrpsee::types::error::INVALID_PARAMS_CODE,
868                "filter not found",
869            ),
870            err @ EthFilterError::InternalError => {
871                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
872            }
873            EthFilterError::EthAPIError(err) => err.into(),
874            err @ (EthFilterError::InvalidBlockRangeParams |
875            EthFilterError::QueryExceedsMaxBlocks(_) |
876            EthFilterError::QueryExceedsMaxResults { .. }) => {
877                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
878            }
879        }
880    }
881}
882
883impl From<ProviderError> for EthFilterError {
884    fn from(err: ProviderError) -> Self {
885        Self::EthAPIError(err.into())
886    }
887}
888
889/// Helper type for the common pattern of returning receipts, block and the original header that is
890/// a match for the filter.
891struct ReceiptBlockResult<P>
892where
893    P: ReceiptProvider + BlockReader,
894{
895    /// We always need the entire receipts for the matching block.
896    receipts: Arc<Vec<ProviderReceipt<P>>>,
897    /// Block can be optional and we can fetch it lazily when needed.
898    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
899    /// The header of the block.
900    header: SealedHeader<<P as HeaderProvider>::Header>,
901}
902
903/// Represents different modes for processing block ranges when filtering logs
904enum RangeMode<
905    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
906> {
907    /// Use cache-based processing for recent blocks
908    Cached(CachedMode<Eth>),
909    /// Use range-based processing for older blocks
910    Range(RangeBlockMode<Eth>),
911}
912
913impl<
914        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
915    > RangeMode<Eth>
916{
917    /// Creates a new `RangeMode`.
918    fn new(
919        filter_inner: Arc<EthFilterInner<Eth>>,
920        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
921        from_block: u64,
922        to_block: u64,
923        max_headers_range: u64,
924        chain_tip: u64,
925    ) -> Self {
926        let block_count = to_block - from_block + 1;
927        let distance_from_tip = chain_tip.saturating_sub(to_block);
928
929        // Determine if we should use cached mode based on range characteristics
930        let use_cached_mode =
931            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
932
933        if use_cached_mode && !sealed_headers.is_empty() {
934            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
935        } else {
936            Self::Range(RangeBlockMode {
937                filter_inner,
938                iter: sealed_headers.into_iter().peekable(),
939                next: VecDeque::new(),
940                max_range: max_headers_range as usize,
941                pending_tasks: FuturesOrdered::new(),
942            })
943        }
944    }
945
946    /// Determines whether to use cached mode based on bloom filter matches and range size
947    const fn should_use_cached_mode(
948        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
949        block_count: u64,
950        distance_from_tip: u64,
951    ) -> bool {
952        // Headers are already filtered by bloom, so count equals length
953        let bloom_matches = headers.len();
954
955        // Calculate adjusted threshold based on bloom matches
956        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
957
958        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
959    }
960
961    /// Calculates the adjusted cache threshold based on bloom filter matches
962    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
963        // Only apply adjustments for larger ranges
964        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
965            return CACHED_MODE_BLOCK_THRESHOLD;
966        }
967
968        match bloom_matches {
969            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
970            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
971            _ => CACHED_MODE_BLOCK_THRESHOLD,
972        }
973    }
974
975    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
976    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
977        match self {
978            Self::Cached(cached) => cached.next().await,
979            Self::Range(range) => range.next().await,
980        }
981    }
982}
983
984/// Mode for processing blocks using cache optimization for recent blocks
985struct CachedMode<
986    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
987> {
988    filter_inner: Arc<EthFilterInner<Eth>>,
989    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
990}
991
992impl<
993        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
994    > CachedMode<Eth>
995{
996    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
997        for header in self.headers_iter.by_ref() {
998            // Use get_receipts_and_maybe_block which has automatic fallback to provider
999            if let Some((receipts, maybe_block)) =
1000                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1001            {
1002                return Ok(Some(ReceiptBlockResult {
1003                    receipts,
1004                    recovered_block: maybe_block,
1005                    header,
1006                }));
1007            }
1008        }
1009
1010        Ok(None) // No more headers
1011    }
1012}
1013
1014/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1015type ReceiptFetchFuture<P> =
1016    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1017
1018/// Mode for processing blocks using range queries for older blocks
1019struct RangeBlockMode<
1020    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1021> {
1022    filter_inner: Arc<EthFilterInner<Eth>>,
1023    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1024    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1025    max_range: usize,
1026    // Stream of ongoing receipt fetching tasks
1027    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1028}
1029
1030impl<
1031        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1032    > RangeBlockMode<Eth>
1033{
1034    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1035        loop {
1036            // First, try to return any already processed result from buffer
1037            if let Some(result) = self.next.pop_front() {
1038                return Ok(Some(result));
1039            }
1040
1041            // Try to get a completed task result if there are pending tasks
1042            if let Some(task_result) = self.pending_tasks.next().await {
1043                self.next.extend(task_result?);
1044                continue;
1045            }
1046
1047            // No pending tasks - try to generate more work
1048            let Some(next_header) = self.iter.next() else {
1049                // No more headers to process
1050                return Ok(None);
1051            };
1052
1053            let mut range_headers = Vec::with_capacity(self.max_range);
1054            range_headers.push(next_header);
1055
1056            // Collect consecutive blocks up to max_range size
1057            while range_headers.len() < self.max_range {
1058                let Some(peeked) = self.iter.peek() else { break };
1059                let Some(last_header) = range_headers.last() else { break };
1060
1061                let expected_next = last_header.number() + 1;
1062                if peeked.number() != expected_next {
1063                    debug!(
1064                        target: "rpc::eth::filter",
1065                        last_block = last_header.number(),
1066                        next_block = peeked.number(),
1067                        expected = expected_next,
1068                        range_size = range_headers.len(),
1069                        "Non-consecutive block detected, stopping range collection"
1070                    );
1071                    break; // Non-consecutive block, stop here
1072                }
1073
1074                let Some(next_header) = self.iter.next() else { break };
1075                range_headers.push(next_header);
1076            }
1077
1078            // Check if we should use parallel processing for large ranges
1079            let remaining_headers = self.iter.len() + range_headers.len();
1080            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1081                self.spawn_parallel_tasks(range_headers);
1082                // Continue loop to await the spawned tasks
1083            } else {
1084                // Process small range sequentially and add results to buffer
1085                if let Some(result) = self.process_small_range(range_headers).await? {
1086                    return Ok(Some(result));
1087                }
1088                // Continue loop to check for more work
1089            }
1090        }
1091    }
1092
1093    /// Process a small range of headers sequentially
1094    ///
1095    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1096    async fn process_small_range(
1097        &mut self,
1098        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1099    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1100        // Process each header individually to avoid queuing for all receipts
1101        for header in range_headers {
1102            // First check if already cached to avoid unnecessary provider calls
1103            let (maybe_block, maybe_receipts) = self
1104                .filter_inner
1105                .eth_cache()
1106                .maybe_cached_block_and_receipts(header.hash())
1107                .await?;
1108
1109            let receipts = match maybe_receipts {
1110                Some(receipts) => receipts,
1111                None => {
1112                    // Not cached - fetch directly from provider
1113                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1114                        Some(receipts) => Arc::new(receipts),
1115                        None => continue, // No receipts found
1116                    }
1117                }
1118            };
1119
1120            if !receipts.is_empty() {
1121                self.next.push_back(ReceiptBlockResult {
1122                    receipts,
1123                    recovered_block: maybe_block,
1124                    header,
1125                });
1126            }
1127        }
1128
1129        Ok(self.next.pop_front())
1130    }
1131
1132    /// Spawn parallel tasks for processing a large range of headers
1133    ///
1134    /// This is used when the remaining headers count is at or above
1135    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1136    fn spawn_parallel_tasks(
1137        &mut self,
1138        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1139    ) {
1140        // Split headers into chunks
1141        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1142        let header_chunks = range_headers
1143            .into_iter()
1144            .chunks(chunk_size)
1145            .into_iter()
1146            .map(|chunk| chunk.collect::<Vec<_>>())
1147            .collect::<Vec<_>>();
1148
1149        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1150        for chunk_headers in header_chunks {
1151            let filter_inner = self.filter_inner.clone();
1152            let chunk_task = Box::pin(async move {
1153                let chunk_task = tokio::task::spawn_blocking(move || {
1154                    let mut chunk_results = Vec::new();
1155
1156                    for header in chunk_headers {
1157                        // Fetch directly from provider - RangeMode is used for older blocks
1158                        // unlikely to be cached
1159                        let receipts = match filter_inner
1160                            .provider()
1161                            .receipts_by_block(header.hash().into())?
1162                        {
1163                            Some(receipts) => Arc::new(receipts),
1164                            None => continue, // No receipts found
1165                        };
1166
1167                        if !receipts.is_empty() {
1168                            chunk_results.push(ReceiptBlockResult {
1169                                receipts,
1170                                recovered_block: None,
1171                                header,
1172                            });
1173                        }
1174                    }
1175
1176                    Ok(chunk_results)
1177                });
1178
1179                // Await the blocking task and handle the result
1180                match chunk_task.await {
1181                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1182                    Ok(Err(e)) => Err(e),
1183                    Err(join_err) => {
1184                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1185                        Err(EthFilterError::InternalError)
1186                    }
1187                }
1188            });
1189
1190            self.pending_tasks.push_back(chunk_task);
1191        }
1192    }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197    use super::*;
1198    use crate::{eth::EthApi, EthApiBuilder};
1199    use alloy_network::Ethereum;
1200    use alloy_primitives::FixedBytes;
1201    use rand::Rng;
1202    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1203    use reth_ethereum_primitives::TxType;
1204    use reth_evm_ethereum::EthEvmConfig;
1205    use reth_network_api::noop::NoopNetwork;
1206    use reth_provider::test_utils::MockEthProvider;
1207    use reth_rpc_convert::RpcConverter;
1208    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1209    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1210    use reth_tasks::TokioTaskExecutor;
1211    use reth_testing_utils::generators;
1212    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1213    use std::{collections::VecDeque, sync::Arc};
1214
1215    #[test]
1216    fn test_block_range_iter() {
1217        let mut rng = generators::rng();
1218
1219        let start = rng.random::<u32>() as u64;
1220        let end = start.saturating_add(rng.random::<u32>() as u64);
1221        let step = rng.random::<u16>() as u64;
1222        let range = start..=end;
1223        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1224        let (from, mut end) = iter.next().unwrap();
1225        assert_eq!(from, start);
1226        assert_eq!(end, (from + step).min(*range.end()));
1227
1228        for (next_from, next_end) in iter {
1229            // ensure range starts with previous end + 1
1230            assert_eq!(next_from, end + 1);
1231            end = next_end;
1232        }
1233
1234        assert_eq!(end, *range.end());
1235    }
1236
1237    // Helper function to create a test EthApi instance
1238    #[expect(clippy::type_complexity)]
1239    fn build_test_eth_api(
1240        provider: MockEthProvider,
1241    ) -> EthApi<
1242        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1243        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1244    > {
1245        EthApiBuilder::new(
1246            provider.clone(),
1247            testing_pool(),
1248            NoopNetwork::default(),
1249            EthEvmConfig::new(provider.chain_spec()),
1250        )
1251        .build()
1252    }
1253
1254    #[tokio::test]
1255    async fn test_range_block_mode_empty_range() {
1256        let provider = MockEthProvider::default();
1257        let eth_api = build_test_eth_api(provider);
1258
1259        let eth_filter = super::EthFilter::new(
1260            eth_api,
1261            EthFilterConfig::default(),
1262            Box::new(TokioTaskExecutor::default()),
1263        );
1264        let filter_inner = eth_filter.inner;
1265
1266        let headers = vec![];
1267        let max_range = 100;
1268
1269        let mut range_mode = RangeBlockMode {
1270            filter_inner,
1271            iter: headers.into_iter().peekable(),
1272            next: VecDeque::new(),
1273            max_range,
1274            pending_tasks: FuturesOrdered::new(),
1275        };
1276
1277        let result = range_mode.next().await;
1278        assert!(result.is_ok());
1279        assert!(result.unwrap().is_none());
1280    }
1281
1282    #[tokio::test]
1283    async fn test_range_block_mode_queued_results_priority() {
1284        let provider = MockEthProvider::default();
1285        let eth_api = build_test_eth_api(provider);
1286
1287        let eth_filter = super::EthFilter::new(
1288            eth_api,
1289            EthFilterConfig::default(),
1290            Box::new(TokioTaskExecutor::default()),
1291        );
1292        let filter_inner = eth_filter.inner;
1293
1294        let headers = vec![
1295            SealedHeader::new(
1296                alloy_consensus::Header { number: 100, ..Default::default() },
1297                FixedBytes::random(),
1298            ),
1299            SealedHeader::new(
1300                alloy_consensus::Header { number: 101, ..Default::default() },
1301                FixedBytes::random(),
1302            ),
1303        ];
1304
1305        // create specific mock results to test ordering
1306        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1307        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1308
1309        // create mock receipts to test receipt handling
1310        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1311            tx_type: TxType::Legacy,
1312            cumulative_gas_used: 100_000,
1313            logs: vec![],
1314            success: true,
1315        };
1316        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1317            tx_type: TxType::Eip1559,
1318            cumulative_gas_used: 200_000,
1319            logs: vec![],
1320            success: true,
1321        };
1322        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1323            tx_type: TxType::Eip2930,
1324            cumulative_gas_used: 150_000,
1325            logs: vec![],
1326            success: false, // Different success status
1327        };
1328
1329        let mock_result_1 = ReceiptBlockResult {
1330            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1331            recovered_block: None,
1332            header: SealedHeader::new(
1333                alloy_consensus::Header { number: 42, ..Default::default() },
1334                expected_block_hash_1,
1335            ),
1336        };
1337
1338        let mock_result_2 = ReceiptBlockResult {
1339            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1340            recovered_block: None,
1341            header: SealedHeader::new(
1342                alloy_consensus::Header { number: 43, ..Default::default() },
1343                expected_block_hash_2,
1344            ),
1345        };
1346
1347        let mut range_mode = RangeBlockMode {
1348            filter_inner,
1349            iter: headers.into_iter().peekable(),
1350            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1351            max_range: 100,
1352            pending_tasks: FuturesOrdered::new(),
1353        };
1354
1355        // first call should return the first queued result (FIFO order)
1356        let result1 = range_mode.next().await;
1357        assert!(result1.is_ok());
1358        let receipt_result1 = result1.unwrap().unwrap();
1359        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1360        assert_eq!(receipt_result1.header.number, 42);
1361
1362        // verify receipts
1363        assert_eq!(receipt_result1.receipts.len(), 2);
1364        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1365        assert_eq!(
1366            receipt_result1.receipts[0].cumulative_gas_used,
1367            mock_receipt_1.cumulative_gas_used
1368        );
1369        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1370        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1371        assert_eq!(
1372            receipt_result1.receipts[1].cumulative_gas_used,
1373            mock_receipt_2.cumulative_gas_used
1374        );
1375        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1376
1377        // second call should return the second queued result
1378        let result2 = range_mode.next().await;
1379        assert!(result2.is_ok());
1380        let receipt_result2 = result2.unwrap().unwrap();
1381        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1382        assert_eq!(receipt_result2.header.number, 43);
1383
1384        // verify receipts
1385        assert_eq!(receipt_result2.receipts.len(), 1);
1386        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1387        assert_eq!(
1388            receipt_result2.receipts[0].cumulative_gas_used,
1389            mock_receipt_3.cumulative_gas_used
1390        );
1391        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1392
1393        // queue should now be empty
1394        assert!(range_mode.next.is_empty());
1395
1396        let result3 = range_mode.next().await;
1397        assert!(result3.is_ok());
1398    }
1399
1400    #[tokio::test]
1401    async fn test_range_block_mode_single_block_no_receipts() {
1402        let provider = MockEthProvider::default();
1403        let eth_api = build_test_eth_api(provider);
1404
1405        let eth_filter = super::EthFilter::new(
1406            eth_api,
1407            EthFilterConfig::default(),
1408            Box::new(TokioTaskExecutor::default()),
1409        );
1410        let filter_inner = eth_filter.inner;
1411
1412        let headers = vec![SealedHeader::new(
1413            alloy_consensus::Header { number: 100, ..Default::default() },
1414            FixedBytes::random(),
1415        )];
1416
1417        let mut range_mode = RangeBlockMode {
1418            filter_inner,
1419            iter: headers.into_iter().peekable(),
1420            next: VecDeque::new(),
1421            max_range: 100,
1422            pending_tasks: FuturesOrdered::new(),
1423        };
1424
1425        let result = range_mode.next().await;
1426        assert!(result.is_ok());
1427    }
1428
1429    #[tokio::test]
1430    async fn test_range_block_mode_provider_receipts() {
1431        let provider = MockEthProvider::default();
1432
1433        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1434        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1435        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1436
1437        let block_hash_1 = FixedBytes::random();
1438        let block_hash_2 = FixedBytes::random();
1439        let block_hash_3 = FixedBytes::random();
1440
1441        provider.add_header(block_hash_1, header_1.clone());
1442        provider.add_header(block_hash_2, header_2.clone());
1443        provider.add_header(block_hash_3, header_3.clone());
1444
1445        // create mock receipts to test provider fetching with mock logs
1446        let mock_log = alloy_primitives::Log {
1447            address: alloy_primitives::Address::ZERO,
1448            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1449        };
1450
1451        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1452            tx_type: TxType::Legacy,
1453            cumulative_gas_used: 21_000,
1454            logs: vec![mock_log.clone()],
1455            success: true,
1456        };
1457        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1458            tx_type: TxType::Eip1559,
1459            cumulative_gas_used: 42_000,
1460            logs: vec![mock_log.clone()],
1461            success: true,
1462        };
1463        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1464            tx_type: TxType::Eip2930,
1465            cumulative_gas_used: 30_000,
1466            logs: vec![mock_log.clone()],
1467            success: false,
1468        };
1469
1470        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1471        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1472
1473        let eth_api = build_test_eth_api(provider);
1474
1475        let eth_filter = super::EthFilter::new(
1476            eth_api,
1477            EthFilterConfig::default(),
1478            Box::new(TokioTaskExecutor::default()),
1479        );
1480        let filter_inner = eth_filter.inner;
1481
1482        let headers = vec![
1483            SealedHeader::new(header_1, block_hash_1),
1484            SealedHeader::new(header_2, block_hash_2),
1485            SealedHeader::new(header_3, block_hash_3),
1486        ];
1487
1488        let mut range_mode = RangeBlockMode {
1489            filter_inner,
1490            iter: headers.into_iter().peekable(),
1491            next: VecDeque::new(),
1492            max_range: 3, // include the 3 blocks in the first queried results
1493            pending_tasks: FuturesOrdered::new(),
1494        };
1495
1496        // first call should fetch receipts from provider and return first block with receipts
1497        let result = range_mode.next().await;
1498        assert!(result.is_ok());
1499        let receipt_result = result.unwrap().unwrap();
1500
1501        assert_eq!(receipt_result.header.hash(), block_hash_1);
1502        assert_eq!(receipt_result.header.number, 100);
1503        assert_eq!(receipt_result.receipts.len(), 2);
1504
1505        // verify receipts
1506        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1507        assert_eq!(
1508            receipt_result.receipts[0].cumulative_gas_used,
1509            receipt_100_1.cumulative_gas_used
1510        );
1511        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1512
1513        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1514        assert_eq!(
1515            receipt_result.receipts[1].cumulative_gas_used,
1516            receipt_100_2.cumulative_gas_used
1517        );
1518        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1519
1520        // second call should return the second block with receipts
1521        let result2 = range_mode.next().await;
1522        assert!(result2.is_ok());
1523        let receipt_result2 = result2.unwrap().unwrap();
1524
1525        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1526        assert_eq!(receipt_result2.header.number, 101);
1527        assert_eq!(receipt_result2.receipts.len(), 1);
1528
1529        // verify receipts
1530        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1531        assert_eq!(
1532            receipt_result2.receipts[0].cumulative_gas_used,
1533            receipt_101_1.cumulative_gas_used
1534        );
1535        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1536
1537        // third call should return None since no more blocks with receipts
1538        let result3 = range_mode.next().await;
1539        assert!(result3.is_ok());
1540        assert!(result3.unwrap().is_none());
1541    }
1542
1543    #[tokio::test]
1544    async fn test_range_block_mode_iterator_exhaustion() {
1545        let provider = MockEthProvider::default();
1546
1547        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1548        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1549
1550        let block_hash_100 = FixedBytes::random();
1551        let block_hash_101 = FixedBytes::random();
1552
1553        // Associate headers with hashes first
1554        provider.add_header(block_hash_100, header_100.clone());
1555        provider.add_header(block_hash_101, header_101.clone());
1556
1557        // Add mock receipts so headers are actually processed
1558        let mock_receipt = reth_ethereum_primitives::Receipt {
1559            tx_type: TxType::Legacy,
1560            cumulative_gas_used: 21_000,
1561            logs: vec![],
1562            success: true,
1563        };
1564        provider.add_receipts(100, vec![mock_receipt.clone()]);
1565        provider.add_receipts(101, vec![mock_receipt.clone()]);
1566
1567        let eth_api = build_test_eth_api(provider);
1568
1569        let eth_filter = super::EthFilter::new(
1570            eth_api,
1571            EthFilterConfig::default(),
1572            Box::new(TokioTaskExecutor::default()),
1573        );
1574        let filter_inner = eth_filter.inner;
1575
1576        let headers = vec![
1577            SealedHeader::new(header_100, block_hash_100),
1578            SealedHeader::new(header_101, block_hash_101),
1579        ];
1580
1581        let mut range_mode = RangeBlockMode {
1582            filter_inner,
1583            iter: headers.into_iter().peekable(),
1584            next: VecDeque::new(),
1585            max_range: 1,
1586            pending_tasks: FuturesOrdered::new(),
1587        };
1588
1589        let result1 = range_mode.next().await;
1590        assert!(result1.is_ok());
1591        assert!(result1.unwrap().is_some()); // Should have processed block 100
1592
1593        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1594
1595        let result2 = range_mode.next().await;
1596        assert!(result2.is_ok());
1597        assert!(result2.unwrap().is_some()); // Should have processed block 101
1598
1599        // now iterator should be exhausted
1600        assert!(range_mode.iter.peek().is_none());
1601
1602        // further calls should return None
1603        let result3 = range_mode.next().await;
1604        assert!(result3.is_ok());
1605        assert!(result3.unwrap().is_none());
1606    }
1607
1608    #[tokio::test]
1609    async fn test_cached_mode_with_mock_receipts() {
1610        // create test data
1611        let test_hash = FixedBytes::from([42u8; 32]);
1612        let test_block_number = 100u64;
1613        let test_header = SealedHeader::new(
1614            alloy_consensus::Header {
1615                number: test_block_number,
1616                gas_used: 50_000,
1617                ..Default::default()
1618            },
1619            test_hash,
1620        );
1621
1622        // add a mock receipt to the provider with a mock log
1623        let mock_log = alloy_primitives::Log {
1624            address: alloy_primitives::Address::ZERO,
1625            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1626        };
1627
1628        let mock_receipt = reth_ethereum_primitives::Receipt {
1629            tx_type: TxType::Legacy,
1630            cumulative_gas_used: 21_000,
1631            logs: vec![mock_log],
1632            success: true,
1633        };
1634
1635        let provider = MockEthProvider::default();
1636        provider.add_header(test_hash, test_header.header().clone());
1637        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1638
1639        let eth_api = build_test_eth_api(provider);
1640        let eth_filter = super::EthFilter::new(
1641            eth_api,
1642            EthFilterConfig::default(),
1643            Box::new(TokioTaskExecutor::default()),
1644        );
1645        let filter_inner = eth_filter.inner;
1646
1647        let headers = vec![test_header.clone()];
1648
1649        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1650
1651        // should find the receipt from provider fallback (cache will be empty)
1652        let result = cached_mode.next().await.expect("next should succeed");
1653        let receipt_block_result = result.expect("should have receipt result");
1654        assert_eq!(receipt_block_result.header.hash(), test_hash);
1655        assert_eq!(receipt_block_result.header.number, test_block_number);
1656        assert_eq!(receipt_block_result.receipts.len(), 1);
1657        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1658        assert_eq!(
1659            receipt_block_result.receipts[0].cumulative_gas_used,
1660            mock_receipt.cumulative_gas_used
1661        );
1662        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1663
1664        // iterator should be exhausted
1665        let result2 = cached_mode.next().await;
1666        assert!(result2.is_ok());
1667        assert!(result2.unwrap().is_none());
1668    }
1669
1670    #[tokio::test]
1671    async fn test_cached_mode_empty_headers() {
1672        let provider = MockEthProvider::default();
1673        let eth_api = build_test_eth_api(provider);
1674
1675        let eth_filter = super::EthFilter::new(
1676            eth_api,
1677            EthFilterConfig::default(),
1678            Box::new(TokioTaskExecutor::default()),
1679        );
1680        let filter_inner = eth_filter.inner;
1681
1682        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1683
1684        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1685
1686        // should immediately return None for empty headers
1687        let result = cached_mode.next().await.expect("next should succeed");
1688        assert!(result.is_none());
1689    }
1690
1691    #[tokio::test]
1692    async fn test_non_consecutive_headers_after_bloom_filter() {
1693        let provider = MockEthProvider::default();
1694
1695        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1696        let mut expected_hashes = vec![];
1697        let mut prev_hash = alloy_primitives::B256::default();
1698
1699        // Create a transaction for blocks that will have receipts
1700        use alloy_consensus::TxLegacy;
1701        use reth_ethereum_primitives::{TransactionSigned, TxType};
1702
1703        let tx_inner = TxLegacy {
1704            chain_id: Some(1),
1705            nonce: 0,
1706            gas_price: 21_000,
1707            gas_limit: 21_000,
1708            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1709            value: alloy_primitives::U256::ZERO,
1710            input: alloy_primitives::Bytes::new(),
1711        };
1712        let signature = alloy_primitives::Signature::test_signature();
1713        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1714
1715        for i in 100u64..=103 {
1716            let header = alloy_consensus::Header {
1717                number: i,
1718                parent_hash: prev_hash,
1719                // Set bloom to match filter only for blocks 100 and 102
1720                logs_bloom: if i == 100 || i == 102 {
1721                    alloy_primitives::Bloom::from([1u8; 256])
1722                } else {
1723                    alloy_primitives::Bloom::default()
1724                },
1725                ..Default::default()
1726            };
1727
1728            let hash = header.hash_slow();
1729            expected_hashes.push(hash);
1730            prev_hash = hash;
1731
1732            // Add transaction to blocks that will have receipts (100 and 102)
1733            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1734
1735            let block = reth_ethereum_primitives::Block {
1736                header,
1737                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1738            };
1739            provider.add_block(hash, block);
1740        }
1741
1742        // Add receipts with logs only to blocks that match bloom
1743        let mock_log = alloy_primitives::Log {
1744            address: alloy_primitives::Address::ZERO,
1745            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1746        };
1747
1748        let receipt = reth_ethereum_primitives::Receipt {
1749            tx_type: TxType::Legacy,
1750            cumulative_gas_used: 21_000,
1751            logs: vec![mock_log],
1752            success: true,
1753        };
1754
1755        provider.add_receipts(100, vec![receipt.clone()]);
1756        provider.add_receipts(101, vec![]);
1757        provider.add_receipts(102, vec![receipt.clone()]);
1758        provider.add_receipts(103, vec![]);
1759
1760        // Add block body indices for each block so receipts can be fetched
1761        use reth_db_api::models::StoredBlockBodyIndices;
1762        provider
1763            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1764        provider
1765            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1766        provider
1767            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1768        provider
1769            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1770
1771        let eth_api = build_test_eth_api(provider);
1772        let eth_filter = EthFilter::new(
1773            eth_api,
1774            EthFilterConfig::default(),
1775            Box::new(TokioTaskExecutor::default()),
1776        );
1777
1778        // Use default filter which will match any non-empty bloom
1779        let filter = Filter::default();
1780
1781        // Get logs in the range - this will trigger the bloom filtering
1782        let logs = eth_filter
1783            .inner
1784            .clone()
1785            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1786            .await
1787            .expect("should succeed");
1788
1789        // We should get logs from blocks 100 and 102 only (bloom filtered)
1790        assert_eq!(logs.len(), 2);
1791
1792        assert_eq!(logs[0].block_number, Some(100));
1793        assert_eq!(logs[1].block_number, Some(102));
1794
1795        // Each block hash should be the hash of its own header, not derived from any other header
1796        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1797        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1798    }
1799}