reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8    PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12    future::TryFutureExt,
13    stream::{FuturesOrdered, StreamExt},
14    Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21    helpers::{EthBlocks, LoadReceipt},
22    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23    RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32    ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37    collections::{HashMap, VecDeque},
38    fmt,
39    iter::{Peekable, StepBy},
40    ops::RangeInclusive,
41    pin::Pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    sync::{mpsc::Receiver, oneshot, Mutex},
47    time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53    Eth: FullEthApiTypes
54        + RpcNodeCoreExt<Provider: BlockIdReader>
55        + LoadReceipt
56        + EthBlocks
57        + 'static,
58{
59    /// Returns logs matching given filter object, no query limits
60    fn logs(
61        &self,
62        filter: Filter,
63        limits: QueryLimits,
64    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65        trace!(target: "rpc::eth", "Serving eth_getLogs");
66        self.logs_for_filter(filter, limits).map_err(|e| e.into())
67    }
68}
69
70/// Threshold for deciding between cached and range mode processing
71const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73/// Threshold for bloom filter matches that triggers reduced caching
74const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76/// Threshold for bloom filter matches that triggers moderately reduced caching
77const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79/// Minimum block count to apply bloom filter match adjustments
80const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82/// The maximum number of headers we read at once when handling a range filter.
83const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
84
85/// Threshold for enabling parallel processing in range mode
86const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88/// Default concurrency for parallel processing
89const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91/// `Eth` filter RPC implementation.
92///
93/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
94pub struct EthFilter<Eth: EthApiTypes> {
95    /// All nested fields bundled together
96    inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101    Eth: EthApiTypes,
102{
103    fn clone(&self) -> Self {
104        Self { inner: self.inner.clone() }
105    }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110    Eth: EthApiTypes + 'static,
111{
112    /// Creates a new, shareable instance.
113    ///
114    /// This uses the given pool to get notified about new transactions, the provider to interact
115    /// with the blockchain, the cache to fetch cacheable data, like the logs.
116    ///
117    /// See also [`EthFilterConfig`].
118    ///
119    /// This also spawns a task that periodically clears stale filters.
120    ///
121    /// # Create a new instance with [`EthApi`](crate::EthApi)
122    ///
123    /// ```no_run
124    /// use reth_evm_ethereum::EthEvmConfig;
125    /// use reth_network_api::noop::NoopNetwork;
126    /// use reth_provider::noop::NoopProvider;
127    /// use reth_rpc::{EthApi, EthFilter};
128    /// use reth_tasks::TokioTaskExecutor;
129    /// use reth_transaction_pool::noop::NoopTransactionPool;
130    /// let eth_api = EthApi::builder(
131    ///     NoopProvider::default(),
132    ///     NoopTransactionPool::default(),
133    ///     NoopNetwork::default(),
134    ///     EthEvmConfig::mainnet(),
135    /// )
136    /// .build();
137    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
138    /// ```
139    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
140        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141            config;
142        let inner = EthFilterInner {
143            eth_api,
144            active_filters: ActiveFilters::new(),
145            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146            max_headers_range: MAX_HEADERS_RANGE,
147            task_spawner,
148            stale_filter_ttl,
149            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150        };
151
152        let eth_filter = Self { inner: Arc::new(inner) };
153
154        let this = eth_filter.clone();
155        eth_filter.inner.task_spawner.spawn_critical(
156            "eth-filters_stale-filters-clean",
157            Box::pin(async move {
158                this.watch_and_clear_stale_filters().await;
159            }),
160        );
161
162        eth_filter
163    }
164
165    /// Returns all currently active filters
166    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167        &self.inner.active_filters
168    }
169
170    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
171    /// Nonetheless, this endless future frees the thread at every await point.
172    async fn watch_and_clear_stale_filters(&self) {
173        let mut interval = tokio::time::interval_at(
174            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175            self.inner.stale_filter_ttl,
176        );
177        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178        loop {
179            interval.tick().await;
180            self.clear_stale_filters(Instant::now()).await;
181        }
182    }
183
184    /// Clears all filters that have not been polled for longer than the configured
185    /// `stale_filter_ttl` at the given instant.
186    pub async fn clear_stale_filters(&self, now: Instant) {
187        trace!(target: "rpc::eth", "clear stale filters");
188        self.active_filters().inner.lock().await.retain(|id, filter| {
189            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
190
191            if !is_valid {
192                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
193            }
194
195            is_valid
196        })
197    }
198}
199
200impl<Eth> EthFilter<Eth>
201where
202    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
203        + RpcNodeCoreExt
204        + LoadReceipt
205        + EthBlocks
206        + 'static,
207{
208    /// Access the underlying provider.
209    fn provider(&self) -> &Eth::Provider {
210        self.inner.eth_api.provider()
211    }
212
213    /// Access the underlying pool.
214    fn pool(&self) -> &Eth::Pool {
215        self.inner.eth_api.pool()
216    }
217
218    /// Returns all the filter changes for the given id, if any
219    pub async fn filter_changes(
220        &self,
221        id: FilterId,
222    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
223        let info = self.provider().chain_info()?;
224        let best_number = info.best_number;
225
226        // start_block is the block from which we should start fetching changes, the next block from
227        // the last time changes were polled, in other words the best block at last poll + 1
228        let (start_block, kind) = {
229            let mut filters = self.inner.active_filters.inner.lock().await;
230            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
231
232            if filter.block > best_number {
233                // no new blocks since the last poll
234                return Ok(FilterChanges::Empty)
235            }
236
237            // update filter
238            // we fetch all changes from [filter.block..best_block], so we advance the filter's
239            // block to `best_block +1`, the next from which we should start fetching changes again
240            let mut block = best_number + 1;
241            std::mem::swap(&mut filter.block, &mut block);
242            filter.last_poll_timestamp = Instant::now();
243
244            (block, filter.kind.clone())
245        };
246
247        match kind {
248            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
249            FilterKind::Block => {
250                // Note: we need to fetch the block hashes from inclusive range
251                // [start_block..best_block]
252                let end_block = best_number + 1;
253                let block_hashes =
254                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
255                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
256                    )?;
257                Ok(FilterChanges::Hashes(block_hashes))
258            }
259            FilterKind::Log(filter) => {
260                let (from_block_number, to_block_number) = match filter.block_option {
261                    FilterBlockOption::Range { from_block, to_block } => {
262                        let from = from_block
263                            .map(|num| self.provider().convert_block_number(num))
264                            .transpose()?
265                            .flatten();
266                        let to = to_block
267                            .map(|num| self.provider().convert_block_number(num))
268                            .transpose()?
269                            .flatten();
270                        logs_utils::get_filter_block_range(from, to, start_block, info)
271                    }
272                    FilterBlockOption::AtBlockHash(_) => {
273                        // blockHash is equivalent to fromBlock = toBlock = the block number with
274                        // hash blockHash
275                        // get_logs_in_block_range is inclusive
276                        (start_block, best_number)
277                    }
278                };
279                let logs = self
280                    .inner
281                    .clone()
282                    .get_logs_in_block_range(
283                        *filter,
284                        from_block_number,
285                        to_block_number,
286                        self.inner.query_limits,
287                    )
288                    .await?;
289                Ok(FilterChanges::Logs(logs))
290            }
291        }
292    }
293
294    /// Returns an array of all logs matching filter with given id.
295    ///
296    /// Returns an error if no matching log filter exists.
297    ///
298    /// Handler for `eth_getFilterLogs`
299    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
300        let filter = {
301            let mut filters = self.inner.active_filters.inner.lock().await;
302            let filter =
303                filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
304            if let FilterKind::Log(ref inner_filter) = filter.kind {
305                filter.last_poll_timestamp = Instant::now();
306                *inner_filter.clone()
307            } else {
308                // Not a log filter
309                return Err(EthFilterError::FilterNotFound(id))
310            }
311        };
312
313        self.logs_for_filter(filter, self.inner.query_limits).await
314    }
315
316    /// Returns logs matching given filter object.
317    async fn logs_for_filter(
318        &self,
319        filter: Filter,
320        limits: QueryLimits,
321    ) -> Result<Vec<Log>, EthFilterError> {
322        self.inner.clone().logs_for_filter(filter, limits).await
323    }
324}
325
326#[async_trait]
327impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
328where
329    Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
330{
331    /// Handler for `eth_newFilter`
332    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
333        trace!(target: "rpc::eth", "Serving eth_newFilter");
334        self.inner
335            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
336            .await
337    }
338
339    /// Handler for `eth_newBlockFilter`
340    async fn new_block_filter(&self) -> RpcResult<FilterId> {
341        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
342        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
343    }
344
345    /// Handler for `eth_newPendingTransactionFilter`
346    async fn new_pending_transaction_filter(
347        &self,
348        kind: Option<PendingTransactionFilterKind>,
349    ) -> RpcResult<FilterId> {
350        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
351
352        let transaction_kind = match kind.unwrap_or_default() {
353            PendingTransactionFilterKind::Hashes => {
354                let receiver = self.pool().pending_transactions_listener();
355                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
356                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
357            }
358            PendingTransactionFilterKind::Full => {
359                let stream = self.pool().new_pending_pool_transactions_listener();
360                let full_txs_receiver = FullTransactionsReceiver::new(
361                    stream,
362                    dyn_clone::clone(self.inner.eth_api.tx_resp_builder()),
363                );
364                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
365                    full_txs_receiver,
366                )))
367            }
368        };
369
370        // Install the filter and propagate any errors
371        self.inner.install_filter(transaction_kind).await
372    }
373
374    /// Handler for `eth_getFilterChanges`
375    async fn filter_changes(
376        &self,
377        id: FilterId,
378    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
379        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
380        Ok(Self::filter_changes(self, id).await?)
381    }
382
383    /// Returns an array of all logs matching filter with given id.
384    ///
385    /// Returns an error if no matching log filter exists.
386    ///
387    /// Handler for `eth_getFilterLogs`
388    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
389        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
390        Ok(Self::filter_logs(self, id).await?)
391    }
392
393    /// Handler for `eth_uninstallFilter`
394    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
395        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
396        let mut filters = self.inner.active_filters.inner.lock().await;
397        if filters.remove(&id).is_some() {
398            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
399            Ok(true)
400        } else {
401            Ok(false)
402        }
403    }
404
405    /// Returns logs matching given filter object.
406    ///
407    /// Handler for `eth_getLogs`
408    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
409        trace!(target: "rpc::eth", "Serving eth_getLogs");
410        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
411    }
412}
413
414impl<Eth> std::fmt::Debug for EthFilter<Eth>
415where
416    Eth: EthApiTypes,
417{
418    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419        f.debug_struct("EthFilter").finish_non_exhaustive()
420    }
421}
422
423/// Container type `EthFilter`
424#[derive(Debug)]
425struct EthFilterInner<Eth: EthApiTypes> {
426    /// Inner `eth` API implementation.
427    eth_api: Eth,
428    /// All currently installed filters.
429    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
430    /// Provides ids to identify filters
431    id_provider: Arc<dyn IdProvider>,
432    /// limits for logs queries
433    query_limits: QueryLimits,
434    /// maximum number of headers to read at once for range filter
435    max_headers_range: u64,
436    /// The type that can spawn tasks.
437    task_spawner: Box<dyn TaskSpawner>,
438    /// Duration since the last filter poll, after which the filter is considered stale
439    stale_filter_ttl: Duration,
440}
441
442impl<Eth> EthFilterInner<Eth>
443where
444    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
445        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
446        + LoadReceipt
447        + EthBlocks
448        + 'static,
449{
450    /// Access the underlying provider.
451    fn provider(&self) -> &Eth::Provider {
452        self.eth_api.provider()
453    }
454
455    /// Access the underlying [`EthStateCache`].
456    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
457        self.eth_api.cache()
458    }
459
460    /// Returns logs matching given filter object.
461    async fn logs_for_filter(
462        self: Arc<Self>,
463        filter: Filter,
464        limits: QueryLimits,
465    ) -> Result<Vec<Log>, EthFilterError> {
466        match filter.block_option {
467            FilterBlockOption::AtBlockHash(block_hash) => {
468                // for all matching logs in the block
469                // get the block header with the hash
470                let header = self
471                    .provider()
472                    .header_by_hash_or_number(block_hash.into())?
473                    .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
474
475                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
476
477                // we also need to ensure that the receipts are available and return an error if
478                // not, in case the block hash been reorged
479                let (receipts, maybe_block) = self
480                    .eth_cache()
481                    .get_receipts_and_maybe_block(block_num_hash.hash)
482                    .await?
483                    .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
484
485                let mut all_logs = Vec::new();
486                append_matching_block_logs(
487                    &mut all_logs,
488                    maybe_block
489                        .map(ProviderOrBlock::Block)
490                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
491                    &filter,
492                    block_num_hash,
493                    &receipts,
494                    false,
495                    header.timestamp(),
496                )?;
497
498                Ok(all_logs)
499            }
500            FilterBlockOption::Range { from_block, to_block } => {
501                // Handle special case where from block is pending
502                if from_block.is_some_and(|b| b.is_pending()) {
503                    let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
504                    if !(to_block.is_pending() || to_block.is_number()) {
505                        // always empty range
506                        return Ok(Vec::new());
507                    }
508                    // Try to get pending block and receipts
509                    if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
510                        if let BlockNumberOrTag::Number(to_block) = to_block &&
511                            to_block < pending_block.block.number()
512                        {
513                            // this block range is empty based on the user input
514                            return Ok(Vec::new());
515                        }
516
517                        let info = self.provider().chain_info()?;
518                        if pending_block.block.number() > info.best_number {
519                            // only consider the pending block if it is ahead of the chain
520                            let mut all_logs = Vec::new();
521                            let timestamp = pending_block.block.timestamp();
522                            let block_num_hash = pending_block.block.num_hash();
523                            append_matching_block_logs(
524                                &mut all_logs,
525                                ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
526                                &filter,
527                                block_num_hash,
528                                &pending_block.receipts,
529                                false, // removed = false for pending blocks
530                                timestamp,
531                            )?;
532                            return Ok(all_logs);
533                        }
534                    }
535                }
536
537                let info = self.provider().chain_info()?;
538                let start_block = info.best_number;
539                let from = from_block
540                    .map(|num| self.provider().convert_block_number(num))
541                    .transpose()?
542                    .flatten();
543                let to = to_block
544                    .map(|num| self.provider().convert_block_number(num))
545                    .transpose()?
546                    .flatten();
547
548                if let Some(f) = from &&
549                    f > info.best_number
550                {
551                    // start block higher than local head, can return empty
552                    return Ok(Vec::new());
553                }
554
555                let (from_block_number, to_block_number) =
556                    logs_utils::get_filter_block_range(from, to, start_block, info);
557
558                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
559                    .await
560            }
561        }
562    }
563
564    /// Installs a new filter and returns the new identifier.
565    async fn install_filter(
566        &self,
567        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
568    ) -> RpcResult<FilterId> {
569        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
570        let subscription_id = self.id_provider.next_id();
571
572        let id = match subscription_id {
573            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
574            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
575        };
576        let mut filters = self.active_filters.inner.lock().await;
577        filters.insert(
578            id.clone(),
579            ActiveFilter {
580                block: last_poll_block_number,
581                last_poll_timestamp: Instant::now(),
582                kind,
583            },
584        );
585        Ok(id)
586    }
587
588    /// Returns all logs in the given _inclusive_ range that match the filter
589    ///
590    /// Returns an error if:
591    ///  - underlying database error
592    ///  - amount of matches exceeds configured limit
593    async fn get_logs_in_block_range(
594        self: Arc<Self>,
595        filter: Filter,
596        from_block: u64,
597        to_block: u64,
598        limits: QueryLimits,
599    ) -> Result<Vec<Log>, EthFilterError> {
600        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
601
602        // perform boundary checks first
603        if to_block < from_block {
604            return Err(EthFilterError::InvalidBlockRangeParams)
605        }
606
607        if let Some(max_blocks_per_filter) =
608            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
609        {
610            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
611        }
612
613        let (tx, rx) = oneshot::channel();
614        let this = self.clone();
615        self.task_spawner.spawn_blocking(Box::pin(async move {
616            let res =
617                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
618            let _ = tx.send(res);
619        }));
620
621        rx.await.map_err(|_| EthFilterError::InternalError)?
622    }
623
624    /// Returns all logs in the given _inclusive_ range that match the filter
625    ///
626    /// Note: This function uses a mix of blocking db operations for fetching indices and header
627    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
628    /// This function is considered blocking and should thus be spawned on a blocking task.
629    ///
630    /// Returns an error if:
631    ///  - underlying database error
632    async fn get_logs_in_block_range_inner(
633        self: Arc<Self>,
634        filter: &Filter,
635        from_block: u64,
636        to_block: u64,
637        limits: QueryLimits,
638    ) -> Result<Vec<Log>, EthFilterError> {
639        let mut all_logs = Vec::new();
640        let mut matching_headers = Vec::new();
641
642        // get current chain tip to determine processing mode
643        let chain_tip = self.provider().best_block_number()?;
644
645        // first collect all headers that match the bloom filter for cached mode decision
646        for (from, to) in
647            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
648        {
649            let headers = self.provider().headers_range(from..=to)?;
650
651            let mut headers_iter = headers.into_iter().peekable();
652
653            while let Some(header) = headers_iter.next() {
654                if !filter.matches_bloom(header.logs_bloom()) {
655                    continue
656                }
657
658                let current_number = header.number();
659
660                let block_hash = match headers_iter.peek() {
661                    Some(next_header) if next_header.number() == current_number + 1 => {
662                        // Headers are consecutive, use the more efficient parent_hash
663                        next_header.parent_hash()
664                    }
665                    _ => {
666                        // Headers not consecutive or last header, calculate hash
667                        header.hash_slow()
668                    }
669                };
670
671                matching_headers.push(SealedHeader::new(header, block_hash));
672            }
673        }
674
675        // initialize the appropriate range mode based on collected headers
676        let mut range_mode = RangeMode::new(
677            self.clone(),
678            matching_headers,
679            from_block,
680            to_block,
681            self.max_headers_range,
682            chain_tip,
683        );
684
685        // iterate through the range mode to get receipts and blocks
686        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
687            range_mode.next().await?
688        {
689            let num_hash = header.num_hash();
690            append_matching_block_logs(
691                &mut all_logs,
692                recovered_block
693                    .map(ProviderOrBlock::Block)
694                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
695                filter,
696                num_hash,
697                &receipts,
698                false,
699                header.timestamp(),
700            )?;
701
702            // size check but only if range is multiple blocks, so we always return all
703            // logs of a single block
704            let is_multi_block_range = from_block != to_block;
705            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
706                is_multi_block_range &&
707                all_logs.len() > max_logs_per_response
708            {
709                debug!(
710                    target: "rpc::eth::filter",
711                    logs_found = all_logs.len(),
712                    max_logs_per_response,
713                    from_block,
714                    to_block = num_hash.number.saturating_sub(1),
715                    "Query exceeded max logs per response limit"
716                );
717                return Err(EthFilterError::QueryExceedsMaxResults {
718                    max_logs: max_logs_per_response,
719                    from_block,
720                    to_block: num_hash.number.saturating_sub(1),
721                });
722            }
723        }
724
725        Ok(all_logs)
726    }
727}
728
729/// All active filters
730#[derive(Debug, Clone, Default)]
731pub struct ActiveFilters<T> {
732    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
733}
734
735impl<T> ActiveFilters<T> {
736    /// Returns an empty instance.
737    pub fn new() -> Self {
738        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
739    }
740}
741
742/// An installed filter
743#[derive(Debug)]
744struct ActiveFilter<T> {
745    /// At which block the filter was polled last.
746    block: u64,
747    /// Last time this filter was polled.
748    last_poll_timestamp: Instant,
749    /// What kind of filter it is.
750    kind: FilterKind<T>,
751}
752
753/// A receiver for pending transactions that returns all new transactions since the last poll.
754#[derive(Debug, Clone)]
755struct PendingTransactionsReceiver {
756    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
757}
758
759impl PendingTransactionsReceiver {
760    fn new(receiver: Receiver<TxHash>) -> Self {
761        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
762    }
763
764    /// Returns all new pending transactions received since the last poll.
765    async fn drain<T>(&self) -> FilterChanges<T> {
766        let mut pending_txs = Vec::new();
767        let mut prepared_stream = self.txs_receiver.lock().await;
768
769        while let Ok(tx_hash) = prepared_stream.try_recv() {
770            pending_txs.push(tx_hash);
771        }
772
773        // Convert the vector of hashes into FilterChanges::Hashes
774        FilterChanges::Hashes(pending_txs)
775    }
776}
777
778/// A structure to manage and provide access to a stream of full transaction details.
779#[derive(Debug, Clone)]
780struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
781    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
782    tx_resp_builder: TxCompat,
783}
784
785impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
786where
787    T: PoolTransaction + 'static,
788    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
789{
790    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
791    fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
792        Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
793    }
794
795    /// Returns all new pending transactions received since the last poll.
796    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
797        let mut pending_txs = Vec::new();
798        let mut prepared_stream = self.txs_stream.lock().await;
799
800        while let Ok(tx) = prepared_stream.try_recv() {
801            match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
802                Ok(tx) => pending_txs.push(tx),
803                Err(err) => {
804                    error!(target: "rpc",
805                        %err,
806                        "Failed to fill txn with block context"
807                    );
808                }
809            }
810        }
811        FilterChanges::Transactions(pending_txs)
812    }
813}
814
815/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
816#[async_trait]
817trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
818    async fn drain(&self) -> FilterChanges<T>;
819}
820
821#[async_trait]
822impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
823    for FullTransactionsReceiver<T, TxCompat>
824where
825    T: PoolTransaction + 'static,
826    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
827{
828    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
829        Self::drain(self).await
830    }
831}
832
833/// Represents the kind of pending transaction data that can be retrieved.
834///
835/// This enum differentiates between two kinds of pending transaction data:
836/// - Just the transaction hashes.
837/// - Full transaction details.
838#[derive(Debug, Clone)]
839enum PendingTransactionKind<T> {
840    Hashes(PendingTransactionsReceiver),
841    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
842}
843
844impl<T: 'static> PendingTransactionKind<T> {
845    async fn drain(&self) -> FilterChanges<T> {
846        match self {
847            Self::Hashes(receiver) => receiver.drain().await,
848            Self::FullTransaction(receiver) => receiver.drain().await,
849        }
850    }
851}
852
853#[derive(Clone, Debug)]
854enum FilterKind<T> {
855    Log(Box<Filter>),
856    Block,
857    PendingTransaction(PendingTransactionKind<T>),
858}
859
860/// An iterator that yields _inclusive_ block ranges of a given step size
861#[derive(Debug)]
862struct BlockRangeInclusiveIter {
863    iter: StepBy<RangeInclusive<u64>>,
864    step: u64,
865    end: u64,
866}
867
868impl BlockRangeInclusiveIter {
869    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
870        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
871    }
872}
873
874impl Iterator for BlockRangeInclusiveIter {
875    type Item = (u64, u64);
876
877    fn next(&mut self) -> Option<Self::Item> {
878        let start = self.iter.next()?;
879        let end = (start + self.step).min(self.end);
880        if start > end {
881            return None
882        }
883        Some((start, end))
884    }
885}
886
887/// Errors that can occur in the handler implementation
888#[derive(Debug, thiserror::Error)]
889pub enum EthFilterError {
890    /// Filter not found.
891    #[error("filter not found")]
892    FilterNotFound(FilterId),
893    /// Invalid block range.
894    #[error("invalid block range params")]
895    InvalidBlockRangeParams,
896    /// Query scope is too broad.
897    #[error("query exceeds max block range {0}")]
898    QueryExceedsMaxBlocks(u64),
899    /// Query result is too large.
900    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
901    QueryExceedsMaxResults {
902        /// Maximum number of logs allowed per response
903        max_logs: usize,
904        /// Start block of the suggested retry range
905        from_block: u64,
906        /// End block of the suggested retry range (last successfully processed block)
907        to_block: u64,
908    },
909    /// Error serving request in `eth_` namespace.
910    #[error(transparent)]
911    EthAPIError(#[from] EthApiError),
912    /// Error thrown when a spawned task failed to deliver a response.
913    #[error("internal filter error")]
914    InternalError,
915}
916
917impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
918    fn from(err: EthFilterError) -> Self {
919        match err {
920            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
921                jsonrpsee::types::error::INVALID_PARAMS_CODE,
922                "filter not found",
923            ),
924            err @ EthFilterError::InternalError => {
925                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
926            }
927            EthFilterError::EthAPIError(err) => err.into(),
928            err @ (EthFilterError::InvalidBlockRangeParams |
929            EthFilterError::QueryExceedsMaxBlocks(_) |
930            EthFilterError::QueryExceedsMaxResults { .. }) => {
931                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
932            }
933        }
934    }
935}
936
937impl From<ProviderError> for EthFilterError {
938    fn from(err: ProviderError) -> Self {
939        Self::EthAPIError(err.into())
940    }
941}
942
943/// Helper type for the common pattern of returning receipts, block and the original header that is
944/// a match for the filter.
945struct ReceiptBlockResult<P>
946where
947    P: ReceiptProvider + BlockReader,
948{
949    /// We always need the entire receipts for the matching block.
950    receipts: Arc<Vec<ProviderReceipt<P>>>,
951    /// Block can be optional and we can fetch it lazily when needed.
952    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
953    /// The header of the block.
954    header: SealedHeader<<P as HeaderProvider>::Header>,
955}
956
957/// Represents different modes for processing block ranges when filtering logs
958enum RangeMode<
959    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
960        + EthApiTypes
961        + LoadReceipt
962        + EthBlocks
963        + 'static,
964> {
965    /// Use cache-based processing for recent blocks
966    Cached(CachedMode<Eth>),
967    /// Use range-based processing for older blocks
968    Range(RangeBlockMode<Eth>),
969}
970
971impl<
972        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
973            + EthApiTypes
974            + LoadReceipt
975            + EthBlocks
976            + 'static,
977    > RangeMode<Eth>
978{
979    /// Creates a new `RangeMode`.
980    fn new(
981        filter_inner: Arc<EthFilterInner<Eth>>,
982        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
983        from_block: u64,
984        to_block: u64,
985        max_headers_range: u64,
986        chain_tip: u64,
987    ) -> Self {
988        let block_count = to_block - from_block + 1;
989        let distance_from_tip = chain_tip.saturating_sub(to_block);
990
991        // Determine if we should use cached mode based on range characteristics
992        let use_cached_mode =
993            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
994
995        if use_cached_mode && !sealed_headers.is_empty() {
996            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
997        } else {
998            Self::Range(RangeBlockMode {
999                filter_inner,
1000                iter: sealed_headers.into_iter().peekable(),
1001                next: VecDeque::new(),
1002                max_range: max_headers_range as usize,
1003                pending_tasks: FuturesOrdered::new(),
1004            })
1005        }
1006    }
1007
1008    /// Determines whether to use cached mode based on bloom filter matches and range size
1009    const fn should_use_cached_mode(
1010        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1011        block_count: u64,
1012        distance_from_tip: u64,
1013    ) -> bool {
1014        // Headers are already filtered by bloom, so count equals length
1015        let bloom_matches = headers.len();
1016
1017        // Calculate adjusted threshold based on bloom matches
1018        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1019
1020        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1021    }
1022
1023    /// Calculates the adjusted cache threshold based on bloom filter matches
1024    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1025        // Only apply adjustments for larger ranges
1026        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1027            return CACHED_MODE_BLOCK_THRESHOLD;
1028        }
1029
1030        match bloom_matches {
1031            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1032            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1033            _ => CACHED_MODE_BLOCK_THRESHOLD,
1034        }
1035    }
1036
1037    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
1038    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1039        match self {
1040            Self::Cached(cached) => cached.next().await,
1041            Self::Range(range) => range.next().await,
1042        }
1043    }
1044}
1045
1046/// Mode for processing blocks using cache optimization for recent blocks
1047struct CachedMode<
1048    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1049        + EthApiTypes
1050        + LoadReceipt
1051        + EthBlocks
1052        + 'static,
1053> {
1054    filter_inner: Arc<EthFilterInner<Eth>>,
1055    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1056}
1057
1058impl<
1059        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1060            + EthApiTypes
1061            + LoadReceipt
1062            + EthBlocks
1063            + 'static,
1064    > CachedMode<Eth>
1065{
1066    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1067        for header in self.headers_iter.by_ref() {
1068            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1069            if let Some((receipts, maybe_block)) =
1070                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1071            {
1072                return Ok(Some(ReceiptBlockResult {
1073                    receipts,
1074                    recovered_block: maybe_block,
1075                    header,
1076                }));
1077            }
1078        }
1079
1080        Ok(None) // No more headers
1081    }
1082}
1083
1084/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1085type ReceiptFetchFuture<P> =
1086    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1087
1088/// Mode for processing blocks using range queries for older blocks
1089struct RangeBlockMode<
1090    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1091        + EthApiTypes
1092        + LoadReceipt
1093        + EthBlocks
1094        + 'static,
1095> {
1096    filter_inner: Arc<EthFilterInner<Eth>>,
1097    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1098    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1099    max_range: usize,
1100    // Stream of ongoing receipt fetching tasks
1101    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1102}
1103
1104impl<
1105        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1106            + EthApiTypes
1107            + LoadReceipt
1108            + EthBlocks
1109            + 'static,
1110    > RangeBlockMode<Eth>
1111{
1112    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1113        loop {
1114            // First, try to return any already processed result from buffer
1115            if let Some(result) = self.next.pop_front() {
1116                return Ok(Some(result));
1117            }
1118
1119            // Try to get a completed task result if there are pending tasks
1120            if let Some(task_result) = self.pending_tasks.next().await {
1121                self.next.extend(task_result?);
1122                continue;
1123            }
1124
1125            // No pending tasks - try to generate more work
1126            let Some(next_header) = self.iter.next() else {
1127                // No more headers to process
1128                return Ok(None);
1129            };
1130
1131            let mut range_headers = Vec::with_capacity(self.max_range);
1132            range_headers.push(next_header);
1133
1134            // Collect consecutive blocks up to max_range size
1135            while range_headers.len() < self.max_range {
1136                let Some(peeked) = self.iter.peek() else { break };
1137                let Some(last_header) = range_headers.last() else { break };
1138
1139                let expected_next = last_header.number() + 1;
1140                if peeked.number() != expected_next {
1141                    trace!(
1142                        target: "rpc::eth::filter",
1143                        last_block = last_header.number(),
1144                        next_block = peeked.number(),
1145                        expected = expected_next,
1146                        range_size = range_headers.len(),
1147                        "Non-consecutive block detected, stopping range collection"
1148                    );
1149                    break; // Non-consecutive block, stop here
1150                }
1151
1152                let Some(next_header) = self.iter.next() else { break };
1153                range_headers.push(next_header);
1154            }
1155
1156            // Check if we should use parallel processing for large ranges
1157            let remaining_headers = self.iter.len() + range_headers.len();
1158            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1159                self.spawn_parallel_tasks(range_headers);
1160                // Continue loop to await the spawned tasks
1161            } else {
1162                // Process small range sequentially and add results to buffer
1163                if let Some(result) = self.process_small_range(range_headers).await? {
1164                    return Ok(Some(result));
1165                }
1166                // Continue loop to check for more work
1167            }
1168        }
1169    }
1170
1171    /// Process a small range of headers sequentially
1172    ///
1173    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1174    async fn process_small_range(
1175        &mut self,
1176        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1177    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1178        // Process each header individually to avoid queuing for all receipts
1179        for header in range_headers {
1180            // First check if already cached to avoid unnecessary provider calls
1181            let (maybe_block, maybe_receipts) = self
1182                .filter_inner
1183                .eth_cache()
1184                .maybe_cached_block_and_receipts(header.hash())
1185                .await?;
1186
1187            let receipts = match maybe_receipts {
1188                Some(receipts) => receipts,
1189                None => {
1190                    // Not cached - fetch directly from provider
1191                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1192                        Some(receipts) => Arc::new(receipts),
1193                        None => continue, // No receipts found
1194                    }
1195                }
1196            };
1197
1198            if !receipts.is_empty() {
1199                self.next.push_back(ReceiptBlockResult {
1200                    receipts,
1201                    recovered_block: maybe_block,
1202                    header,
1203                });
1204            }
1205        }
1206
1207        Ok(self.next.pop_front())
1208    }
1209
1210    /// Spawn parallel tasks for processing a large range of headers
1211    ///
1212    /// This is used when the remaining headers count is at or above
1213    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1214    fn spawn_parallel_tasks(
1215        &mut self,
1216        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1217    ) {
1218        // Split headers into chunks
1219        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1220        let header_chunks = range_headers
1221            .into_iter()
1222            .chunks(chunk_size)
1223            .into_iter()
1224            .map(|chunk| chunk.collect::<Vec<_>>())
1225            .collect::<Vec<_>>();
1226
1227        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1228        for chunk_headers in header_chunks {
1229            let filter_inner = self.filter_inner.clone();
1230            let chunk_task = Box::pin(async move {
1231                let chunk_task = tokio::task::spawn_blocking(move || {
1232                    let mut chunk_results = Vec::new();
1233
1234                    for header in chunk_headers {
1235                        // Fetch directly from provider - RangeMode is used for older blocks
1236                        // unlikely to be cached
1237                        let receipts = match filter_inner
1238                            .provider()
1239                            .receipts_by_block(header.hash().into())?
1240                        {
1241                            Some(receipts) => Arc::new(receipts),
1242                            None => continue, // No receipts found
1243                        };
1244
1245                        if !receipts.is_empty() {
1246                            chunk_results.push(ReceiptBlockResult {
1247                                receipts,
1248                                recovered_block: None,
1249                                header,
1250                            });
1251                        }
1252                    }
1253
1254                    Ok(chunk_results)
1255                });
1256
1257                // Await the blocking task and handle the result
1258                match chunk_task.await {
1259                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1260                    Ok(Err(e)) => Err(e),
1261                    Err(join_err) => {
1262                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1263                        Err(EthFilterError::InternalError)
1264                    }
1265                }
1266            });
1267
1268            self.pending_tasks.push_back(chunk_task);
1269        }
1270    }
1271}
1272
1273#[cfg(test)]
1274mod tests {
1275    use super::*;
1276    use crate::{eth::EthApi, EthApiBuilder};
1277    use alloy_network::Ethereum;
1278    use alloy_primitives::FixedBytes;
1279    use rand::Rng;
1280    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1281    use reth_ethereum_primitives::TxType;
1282    use reth_evm_ethereum::EthEvmConfig;
1283    use reth_network_api::noop::NoopNetwork;
1284    use reth_provider::test_utils::MockEthProvider;
1285    use reth_rpc_convert::RpcConverter;
1286    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1287    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1288    use reth_tasks::TokioTaskExecutor;
1289    use reth_testing_utils::generators;
1290    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1291    use std::{collections::VecDeque, sync::Arc};
1292
1293    #[test]
1294    fn test_block_range_iter() {
1295        let mut rng = generators::rng();
1296
1297        let start = rng.random::<u32>() as u64;
1298        let end = start.saturating_add(rng.random::<u32>() as u64);
1299        let step = rng.random::<u16>() as u64;
1300        let range = start..=end;
1301        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1302        let (from, mut end) = iter.next().unwrap();
1303        assert_eq!(from, start);
1304        assert_eq!(end, (from + step).min(*range.end()));
1305
1306        for (next_from, next_end) in iter {
1307            // ensure range starts with previous end + 1
1308            assert_eq!(next_from, end + 1);
1309            end = next_end;
1310        }
1311
1312        assert_eq!(end, *range.end());
1313    }
1314
1315    // Helper function to create a test EthApi instance
1316    #[expect(clippy::type_complexity)]
1317    fn build_test_eth_api(
1318        provider: MockEthProvider,
1319    ) -> EthApi<
1320        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1321        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1322    > {
1323        EthApiBuilder::new(
1324            provider.clone(),
1325            testing_pool(),
1326            NoopNetwork::default(),
1327            EthEvmConfig::new(provider.chain_spec()),
1328        )
1329        .build()
1330    }
1331
1332    #[tokio::test]
1333    async fn test_range_block_mode_empty_range() {
1334        let provider = MockEthProvider::default();
1335        let eth_api = build_test_eth_api(provider);
1336
1337        let eth_filter = super::EthFilter::new(
1338            eth_api,
1339            EthFilterConfig::default(),
1340            Box::new(TokioTaskExecutor::default()),
1341        );
1342        let filter_inner = eth_filter.inner;
1343
1344        let headers = vec![];
1345        let max_range = 100;
1346
1347        let mut range_mode = RangeBlockMode {
1348            filter_inner,
1349            iter: headers.into_iter().peekable(),
1350            next: VecDeque::new(),
1351            max_range,
1352            pending_tasks: FuturesOrdered::new(),
1353        };
1354
1355        let result = range_mode.next().await;
1356        assert!(result.is_ok());
1357        assert!(result.unwrap().is_none());
1358    }
1359
1360    #[tokio::test]
1361    async fn test_range_block_mode_queued_results_priority() {
1362        let provider = MockEthProvider::default();
1363        let eth_api = build_test_eth_api(provider);
1364
1365        let eth_filter = super::EthFilter::new(
1366            eth_api,
1367            EthFilterConfig::default(),
1368            Box::new(TokioTaskExecutor::default()),
1369        );
1370        let filter_inner = eth_filter.inner;
1371
1372        let headers = vec![
1373            SealedHeader::new(
1374                alloy_consensus::Header { number: 100, ..Default::default() },
1375                FixedBytes::random(),
1376            ),
1377            SealedHeader::new(
1378                alloy_consensus::Header { number: 101, ..Default::default() },
1379                FixedBytes::random(),
1380            ),
1381        ];
1382
1383        // create specific mock results to test ordering
1384        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1385        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1386
1387        // create mock receipts to test receipt handling
1388        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1389            tx_type: TxType::Legacy,
1390            cumulative_gas_used: 100_000,
1391            logs: vec![],
1392            success: true,
1393        };
1394        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1395            tx_type: TxType::Eip1559,
1396            cumulative_gas_used: 200_000,
1397            logs: vec![],
1398            success: true,
1399        };
1400        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1401            tx_type: TxType::Eip2930,
1402            cumulative_gas_used: 150_000,
1403            logs: vec![],
1404            success: false, // Different success status
1405        };
1406
1407        let mock_result_1 = ReceiptBlockResult {
1408            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1409            recovered_block: None,
1410            header: SealedHeader::new(
1411                alloy_consensus::Header { number: 42, ..Default::default() },
1412                expected_block_hash_1,
1413            ),
1414        };
1415
1416        let mock_result_2 = ReceiptBlockResult {
1417            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1418            recovered_block: None,
1419            header: SealedHeader::new(
1420                alloy_consensus::Header { number: 43, ..Default::default() },
1421                expected_block_hash_2,
1422            ),
1423        };
1424
1425        let mut range_mode = RangeBlockMode {
1426            filter_inner,
1427            iter: headers.into_iter().peekable(),
1428            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1429            max_range: 100,
1430            pending_tasks: FuturesOrdered::new(),
1431        };
1432
1433        // first call should return the first queued result (FIFO order)
1434        let result1 = range_mode.next().await;
1435        assert!(result1.is_ok());
1436        let receipt_result1 = result1.unwrap().unwrap();
1437        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1438        assert_eq!(receipt_result1.header.number, 42);
1439
1440        // verify receipts
1441        assert_eq!(receipt_result1.receipts.len(), 2);
1442        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1443        assert_eq!(
1444            receipt_result1.receipts[0].cumulative_gas_used,
1445            mock_receipt_1.cumulative_gas_used
1446        );
1447        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1448        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1449        assert_eq!(
1450            receipt_result1.receipts[1].cumulative_gas_used,
1451            mock_receipt_2.cumulative_gas_used
1452        );
1453        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1454
1455        // second call should return the second queued result
1456        let result2 = range_mode.next().await;
1457        assert!(result2.is_ok());
1458        let receipt_result2 = result2.unwrap().unwrap();
1459        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1460        assert_eq!(receipt_result2.header.number, 43);
1461
1462        // verify receipts
1463        assert_eq!(receipt_result2.receipts.len(), 1);
1464        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1465        assert_eq!(
1466            receipt_result2.receipts[0].cumulative_gas_used,
1467            mock_receipt_3.cumulative_gas_used
1468        );
1469        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1470
1471        // queue should now be empty
1472        assert!(range_mode.next.is_empty());
1473
1474        let result3 = range_mode.next().await;
1475        assert!(result3.is_ok());
1476    }
1477
1478    #[tokio::test]
1479    async fn test_range_block_mode_single_block_no_receipts() {
1480        let provider = MockEthProvider::default();
1481        let eth_api = build_test_eth_api(provider);
1482
1483        let eth_filter = super::EthFilter::new(
1484            eth_api,
1485            EthFilterConfig::default(),
1486            Box::new(TokioTaskExecutor::default()),
1487        );
1488        let filter_inner = eth_filter.inner;
1489
1490        let headers = vec![SealedHeader::new(
1491            alloy_consensus::Header { number: 100, ..Default::default() },
1492            FixedBytes::random(),
1493        )];
1494
1495        let mut range_mode = RangeBlockMode {
1496            filter_inner,
1497            iter: headers.into_iter().peekable(),
1498            next: VecDeque::new(),
1499            max_range: 100,
1500            pending_tasks: FuturesOrdered::new(),
1501        };
1502
1503        let result = range_mode.next().await;
1504        assert!(result.is_ok());
1505    }
1506
1507    #[tokio::test]
1508    async fn test_range_block_mode_provider_receipts() {
1509        let provider = MockEthProvider::default();
1510
1511        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1512        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1513        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1514
1515        let block_hash_1 = FixedBytes::random();
1516        let block_hash_2 = FixedBytes::random();
1517        let block_hash_3 = FixedBytes::random();
1518
1519        provider.add_header(block_hash_1, header_1.clone());
1520        provider.add_header(block_hash_2, header_2.clone());
1521        provider.add_header(block_hash_3, header_3.clone());
1522
1523        // create mock receipts to test provider fetching with mock logs
1524        let mock_log = alloy_primitives::Log {
1525            address: alloy_primitives::Address::ZERO,
1526            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1527        };
1528
1529        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1530            tx_type: TxType::Legacy,
1531            cumulative_gas_used: 21_000,
1532            logs: vec![mock_log.clone()],
1533            success: true,
1534        };
1535        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1536            tx_type: TxType::Eip1559,
1537            cumulative_gas_used: 42_000,
1538            logs: vec![mock_log.clone()],
1539            success: true,
1540        };
1541        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1542            tx_type: TxType::Eip2930,
1543            cumulative_gas_used: 30_000,
1544            logs: vec![mock_log.clone()],
1545            success: false,
1546        };
1547
1548        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1549        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1550
1551        let eth_api = build_test_eth_api(provider);
1552
1553        let eth_filter = super::EthFilter::new(
1554            eth_api,
1555            EthFilterConfig::default(),
1556            Box::new(TokioTaskExecutor::default()),
1557        );
1558        let filter_inner = eth_filter.inner;
1559
1560        let headers = vec![
1561            SealedHeader::new(header_1, block_hash_1),
1562            SealedHeader::new(header_2, block_hash_2),
1563            SealedHeader::new(header_3, block_hash_3),
1564        ];
1565
1566        let mut range_mode = RangeBlockMode {
1567            filter_inner,
1568            iter: headers.into_iter().peekable(),
1569            next: VecDeque::new(),
1570            max_range: 3, // include the 3 blocks in the first queried results
1571            pending_tasks: FuturesOrdered::new(),
1572        };
1573
1574        // first call should fetch receipts from provider and return first block with receipts
1575        let result = range_mode.next().await;
1576        assert!(result.is_ok());
1577        let receipt_result = result.unwrap().unwrap();
1578
1579        assert_eq!(receipt_result.header.hash(), block_hash_1);
1580        assert_eq!(receipt_result.header.number, 100);
1581        assert_eq!(receipt_result.receipts.len(), 2);
1582
1583        // verify receipts
1584        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1585        assert_eq!(
1586            receipt_result.receipts[0].cumulative_gas_used,
1587            receipt_100_1.cumulative_gas_used
1588        );
1589        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1590
1591        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1592        assert_eq!(
1593            receipt_result.receipts[1].cumulative_gas_used,
1594            receipt_100_2.cumulative_gas_used
1595        );
1596        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1597
1598        // second call should return the second block with receipts
1599        let result2 = range_mode.next().await;
1600        assert!(result2.is_ok());
1601        let receipt_result2 = result2.unwrap().unwrap();
1602
1603        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1604        assert_eq!(receipt_result2.header.number, 101);
1605        assert_eq!(receipt_result2.receipts.len(), 1);
1606
1607        // verify receipts
1608        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1609        assert_eq!(
1610            receipt_result2.receipts[0].cumulative_gas_used,
1611            receipt_101_1.cumulative_gas_used
1612        );
1613        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1614
1615        // third call should return None since no more blocks with receipts
1616        let result3 = range_mode.next().await;
1617        assert!(result3.is_ok());
1618        assert!(result3.unwrap().is_none());
1619    }
1620
1621    #[tokio::test]
1622    async fn test_range_block_mode_iterator_exhaustion() {
1623        let provider = MockEthProvider::default();
1624
1625        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1626        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1627
1628        let block_hash_100 = FixedBytes::random();
1629        let block_hash_101 = FixedBytes::random();
1630
1631        // Associate headers with hashes first
1632        provider.add_header(block_hash_100, header_100.clone());
1633        provider.add_header(block_hash_101, header_101.clone());
1634
1635        // Add mock receipts so headers are actually processed
1636        let mock_receipt = reth_ethereum_primitives::Receipt {
1637            tx_type: TxType::Legacy,
1638            cumulative_gas_used: 21_000,
1639            logs: vec![],
1640            success: true,
1641        };
1642        provider.add_receipts(100, vec![mock_receipt.clone()]);
1643        provider.add_receipts(101, vec![mock_receipt.clone()]);
1644
1645        let eth_api = build_test_eth_api(provider);
1646
1647        let eth_filter = super::EthFilter::new(
1648            eth_api,
1649            EthFilterConfig::default(),
1650            Box::new(TokioTaskExecutor::default()),
1651        );
1652        let filter_inner = eth_filter.inner;
1653
1654        let headers = vec![
1655            SealedHeader::new(header_100, block_hash_100),
1656            SealedHeader::new(header_101, block_hash_101),
1657        ];
1658
1659        let mut range_mode = RangeBlockMode {
1660            filter_inner,
1661            iter: headers.into_iter().peekable(),
1662            next: VecDeque::new(),
1663            max_range: 1,
1664            pending_tasks: FuturesOrdered::new(),
1665        };
1666
1667        let result1 = range_mode.next().await;
1668        assert!(result1.is_ok());
1669        assert!(result1.unwrap().is_some()); // Should have processed block 100
1670
1671        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1672
1673        let result2 = range_mode.next().await;
1674        assert!(result2.is_ok());
1675        assert!(result2.unwrap().is_some()); // Should have processed block 101
1676
1677        // now iterator should be exhausted
1678        assert!(range_mode.iter.peek().is_none());
1679
1680        // further calls should return None
1681        let result3 = range_mode.next().await;
1682        assert!(result3.is_ok());
1683        assert!(result3.unwrap().is_none());
1684    }
1685
1686    #[tokio::test]
1687    async fn test_cached_mode_with_mock_receipts() {
1688        // create test data
1689        let test_hash = FixedBytes::from([42u8; 32]);
1690        let test_block_number = 100u64;
1691        let test_header = SealedHeader::new(
1692            alloy_consensus::Header {
1693                number: test_block_number,
1694                gas_used: 50_000,
1695                ..Default::default()
1696            },
1697            test_hash,
1698        );
1699
1700        // add a mock receipt to the provider with a mock log
1701        let mock_log = alloy_primitives::Log {
1702            address: alloy_primitives::Address::ZERO,
1703            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1704        };
1705
1706        let mock_receipt = reth_ethereum_primitives::Receipt {
1707            tx_type: TxType::Legacy,
1708            cumulative_gas_used: 21_000,
1709            logs: vec![mock_log],
1710            success: true,
1711        };
1712
1713        let provider = MockEthProvider::default();
1714        provider.add_header(test_hash, test_header.header().clone());
1715        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1716
1717        let eth_api = build_test_eth_api(provider);
1718        let eth_filter = super::EthFilter::new(
1719            eth_api,
1720            EthFilterConfig::default(),
1721            Box::new(TokioTaskExecutor::default()),
1722        );
1723        let filter_inner = eth_filter.inner;
1724
1725        let headers = vec![test_header.clone()];
1726
1727        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1728
1729        // should find the receipt from provider fallback (cache will be empty)
1730        let result = cached_mode.next().await.expect("next should succeed");
1731        let receipt_block_result = result.expect("should have receipt result");
1732        assert_eq!(receipt_block_result.header.hash(), test_hash);
1733        assert_eq!(receipt_block_result.header.number, test_block_number);
1734        assert_eq!(receipt_block_result.receipts.len(), 1);
1735        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1736        assert_eq!(
1737            receipt_block_result.receipts[0].cumulative_gas_used,
1738            mock_receipt.cumulative_gas_used
1739        );
1740        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1741
1742        // iterator should be exhausted
1743        let result2 = cached_mode.next().await;
1744        assert!(result2.is_ok());
1745        assert!(result2.unwrap().is_none());
1746    }
1747
1748    #[tokio::test]
1749    async fn test_cached_mode_empty_headers() {
1750        let provider = MockEthProvider::default();
1751        let eth_api = build_test_eth_api(provider);
1752
1753        let eth_filter = super::EthFilter::new(
1754            eth_api,
1755            EthFilterConfig::default(),
1756            Box::new(TokioTaskExecutor::default()),
1757        );
1758        let filter_inner = eth_filter.inner;
1759
1760        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1761
1762        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1763
1764        // should immediately return None for empty headers
1765        let result = cached_mode.next().await.expect("next should succeed");
1766        assert!(result.is_none());
1767    }
1768
1769    #[tokio::test]
1770    async fn test_non_consecutive_headers_after_bloom_filter() {
1771        let provider = MockEthProvider::default();
1772
1773        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1774        let mut expected_hashes = vec![];
1775        let mut prev_hash = alloy_primitives::B256::default();
1776
1777        // Create a transaction for blocks that will have receipts
1778        use alloy_consensus::TxLegacy;
1779        use reth_ethereum_primitives::{TransactionSigned, TxType};
1780
1781        let tx_inner = TxLegacy {
1782            chain_id: Some(1),
1783            nonce: 0,
1784            gas_price: 21_000,
1785            gas_limit: 21_000,
1786            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1787            value: alloy_primitives::U256::ZERO,
1788            input: alloy_primitives::Bytes::new(),
1789        };
1790        let signature = alloy_primitives::Signature::test_signature();
1791        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1792
1793        for i in 100u64..=103 {
1794            let header = alloy_consensus::Header {
1795                number: i,
1796                parent_hash: prev_hash,
1797                // Set bloom to match filter only for blocks 100 and 102
1798                logs_bloom: if i == 100 || i == 102 {
1799                    alloy_primitives::Bloom::from([1u8; 256])
1800                } else {
1801                    alloy_primitives::Bloom::default()
1802                },
1803                ..Default::default()
1804            };
1805
1806            let hash = header.hash_slow();
1807            expected_hashes.push(hash);
1808            prev_hash = hash;
1809
1810            // Add transaction to blocks that will have receipts (100 and 102)
1811            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1812
1813            let block = reth_ethereum_primitives::Block {
1814                header,
1815                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1816            };
1817            provider.add_block(hash, block);
1818        }
1819
1820        // Add receipts with logs only to blocks that match bloom
1821        let mock_log = alloy_primitives::Log {
1822            address: alloy_primitives::Address::ZERO,
1823            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1824        };
1825
1826        let receipt = reth_ethereum_primitives::Receipt {
1827            tx_type: TxType::Legacy,
1828            cumulative_gas_used: 21_000,
1829            logs: vec![mock_log],
1830            success: true,
1831        };
1832
1833        provider.add_receipts(100, vec![receipt.clone()]);
1834        provider.add_receipts(101, vec![]);
1835        provider.add_receipts(102, vec![receipt.clone()]);
1836        provider.add_receipts(103, vec![]);
1837
1838        // Add block body indices for each block so receipts can be fetched
1839        use reth_db_api::models::StoredBlockBodyIndices;
1840        provider
1841            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1842        provider
1843            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1844        provider
1845            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1846        provider
1847            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1848
1849        let eth_api = build_test_eth_api(provider);
1850        let eth_filter = EthFilter::new(
1851            eth_api,
1852            EthFilterConfig::default(),
1853            Box::new(TokioTaskExecutor::default()),
1854        );
1855
1856        // Use default filter which will match any non-empty bloom
1857        let filter = Filter::default();
1858
1859        // Get logs in the range - this will trigger the bloom filtering
1860        let logs = eth_filter
1861            .inner
1862            .clone()
1863            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1864            .await
1865            .expect("should succeed");
1866
1867        // We should get logs from blocks 100 and 102 only (bloom filtered)
1868        assert_eq!(logs.len(), 2);
1869
1870        assert_eq!(logs[0].block_number, Some(100));
1871        assert_eq!(logs[1].block_number, Some(102));
1872
1873        // Each block hash should be the hash of its own header, not derived from any other header
1874        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1875        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1876    }
1877}