reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8    PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12    future::TryFutureExt,
13    stream::{FuturesOrdered, StreamExt},
14    Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21    helpers::{EthBlocks, LoadReceipt},
22    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23    RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32    ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37    collections::{HashMap, VecDeque},
38    fmt,
39    iter::{Peekable, StepBy},
40    ops::RangeInclusive,
41    pin::Pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    sync::{mpsc::Receiver, oneshot, Mutex},
47    time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53    Eth: FullEthApiTypes
54        + RpcNodeCoreExt<Provider: BlockIdReader>
55        + LoadReceipt
56        + EthBlocks
57        + 'static,
58{
59    /// Returns logs matching given filter object, no query limits
60    fn logs(
61        &self,
62        filter: Filter,
63        limits: QueryLimits,
64    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65        trace!(target: "rpc::eth", "Serving eth_getLogs");
66        self.logs_for_filter(filter, limits).map_err(|e| e.into())
67    }
68}
69
70/// Threshold for deciding between cached and range mode processing
71const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73/// Threshold for bloom filter matches that triggers reduced caching
74const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76/// Threshold for bloom filter matches that triggers moderately reduced caching
77const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79/// Minimum block count to apply bloom filter match adjustments
80const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82/// The maximum number of headers we read at once when handling a range filter.
83const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
84
85/// Threshold for enabling parallel processing in range mode
86const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88/// Default concurrency for parallel processing
89const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91/// `Eth` filter RPC implementation.
92///
93/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
94pub struct EthFilter<Eth: EthApiTypes> {
95    /// All nested fields bundled together
96    inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101    Eth: EthApiTypes,
102{
103    fn clone(&self) -> Self {
104        Self { inner: self.inner.clone() }
105    }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110    Eth: EthApiTypes + 'static,
111{
112    /// Creates a new, shareable instance.
113    ///
114    /// This uses the given pool to get notified about new transactions, the provider to interact
115    /// with the blockchain, the cache to fetch cacheable data, like the logs.
116    ///
117    /// See also [`EthFilterConfig`].
118    ///
119    /// This also spawns a task that periodically clears stale filters.
120    ///
121    /// # Create a new instance with [`EthApi`](crate::EthApi)
122    ///
123    /// ```no_run
124    /// use reth_evm_ethereum::EthEvmConfig;
125    /// use reth_network_api::noop::NoopNetwork;
126    /// use reth_provider::noop::NoopProvider;
127    /// use reth_rpc::{EthApi, EthFilter};
128    /// use reth_tasks::TokioTaskExecutor;
129    /// use reth_transaction_pool::noop::NoopTransactionPool;
130    /// let eth_api = EthApi::builder(
131    ///     NoopProvider::default(),
132    ///     NoopTransactionPool::default(),
133    ///     NoopNetwork::default(),
134    ///     EthEvmConfig::mainnet(),
135    /// )
136    /// .build();
137    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
138    /// ```
139    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
140        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141            config;
142        let inner = EthFilterInner {
143            eth_api,
144            active_filters: ActiveFilters::new(),
145            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146            max_headers_range: MAX_HEADERS_RANGE,
147            task_spawner,
148            stale_filter_ttl,
149            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150        };
151
152        let eth_filter = Self { inner: Arc::new(inner) };
153
154        let this = eth_filter.clone();
155        eth_filter.inner.task_spawner.spawn_critical(
156            "eth-filters_stale-filters-clean",
157            Box::pin(async move {
158                this.watch_and_clear_stale_filters().await;
159            }),
160        );
161
162        eth_filter
163    }
164
165    /// Returns all currently active filters
166    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167        &self.inner.active_filters
168    }
169
170    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
171    /// Nonetheless, this endless future frees the thread at every await point.
172    async fn watch_and_clear_stale_filters(&self) {
173        let mut interval = tokio::time::interval_at(
174            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175            self.inner.stale_filter_ttl,
176        );
177        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178        loop {
179            interval.tick().await;
180            self.clear_stale_filters(Instant::now()).await;
181        }
182    }
183
184    /// Clears all filters that have not been polled for longer than the configured
185    /// `stale_filter_ttl` at the given instant.
186    pub async fn clear_stale_filters(&self, now: Instant) {
187        trace!(target: "rpc::eth", "clear stale filters");
188        let mut filters = self.active_filters().inner.lock().await;
189        filters.retain(|id, filter| {
190            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192            if !is_valid {
193                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194            }
195
196            is_valid
197        });
198        filters.shrink_to_fit();
199    }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205        + RpcNodeCoreExt
206        + LoadReceipt
207        + EthBlocks
208        + 'static,
209{
210    /// Access the underlying provider.
211    fn provider(&self) -> &Eth::Provider {
212        self.inner.eth_api.provider()
213    }
214
215    /// Access the underlying pool.
216    fn pool(&self) -> &Eth::Pool {
217        self.inner.eth_api.pool()
218    }
219
220    /// Returns all the filter changes for the given id, if any
221    pub async fn filter_changes(
222        &self,
223        id: FilterId,
224    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225        let info = self.provider().chain_info()?;
226        let best_number = info.best_number;
227
228        // start_block is the block from which we should start fetching changes, the next block from
229        // the last time changes were polled, in other words the best block at last poll + 1
230        let (start_block, kind) = {
231            let mut filters = self.inner.active_filters.inner.lock().await;
232            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234            if filter.block > best_number {
235                // no new blocks since the last poll
236                return Ok(FilterChanges::Empty)
237            }
238
239            // update filter
240            // we fetch all changes from [filter.block..best_block], so we advance the filter's
241            // block to `best_block +1`, the next from which we should start fetching changes again
242            let mut block = best_number + 1;
243            std::mem::swap(&mut filter.block, &mut block);
244            filter.last_poll_timestamp = Instant::now();
245
246            (block, filter.kind.clone())
247        };
248
249        match kind {
250            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251            FilterKind::Block => {
252                // Note: we need to fetch the block hashes from inclusive range
253                // [start_block..best_block]
254                let end_block = best_number + 1;
255                let block_hashes =
256                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
257                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258                    )?;
259                Ok(FilterChanges::Hashes(block_hashes))
260            }
261            FilterKind::Log(filter) => {
262                let (from_block_number, to_block_number) = match filter.block_option {
263                    FilterBlockOption::Range { from_block, to_block } => {
264                        let from = from_block
265                            .map(|num| self.provider().convert_block_number(num))
266                            .transpose()?
267                            .flatten();
268                        let to = to_block
269                            .map(|num| self.provider().convert_block_number(num))
270                            .transpose()?
271                            .flatten();
272                        logs_utils::get_filter_block_range(from, to, start_block, info)?
273                    }
274                    FilterBlockOption::AtBlockHash(_) => {
275                        // blockHash is equivalent to fromBlock = toBlock = the block number with
276                        // hash blockHash
277                        // get_logs_in_block_range is inclusive
278                        (start_block, best_number)
279                    }
280                };
281                let logs = self
282                    .inner
283                    .clone()
284                    .get_logs_in_block_range(
285                        *filter,
286                        from_block_number,
287                        to_block_number,
288                        self.inner.query_limits,
289                    )
290                    .await?;
291                Ok(FilterChanges::Logs(logs))
292            }
293        }
294    }
295
296    /// Returns an array of all logs matching filter with given id.
297    ///
298    /// Returns an error if no matching log filter exists.
299    ///
300    /// Handler for `eth_getFilterLogs`
301    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
302        let filter = {
303            let mut filters = self.inner.active_filters.inner.lock().await;
304            let filter =
305                filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
306            if let FilterKind::Log(ref inner_filter) = filter.kind {
307                filter.last_poll_timestamp = Instant::now();
308                *inner_filter.clone()
309            } else {
310                // Not a log filter
311                return Err(EthFilterError::FilterNotFound(id))
312            }
313        };
314
315        self.logs_for_filter(filter, self.inner.query_limits).await
316    }
317
318    /// Returns logs matching given filter object.
319    async fn logs_for_filter(
320        &self,
321        filter: Filter,
322        limits: QueryLimits,
323    ) -> Result<Vec<Log>, EthFilterError> {
324        self.inner.clone().logs_for_filter(filter, limits).await
325    }
326}
327
328#[async_trait]
329impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
330where
331    Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
332{
333    /// Handler for `eth_newFilter`
334    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
335        trace!(target: "rpc::eth", "Serving eth_newFilter");
336        self.inner
337            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
338            .await
339    }
340
341    /// Handler for `eth_newBlockFilter`
342    async fn new_block_filter(&self) -> RpcResult<FilterId> {
343        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
344        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
345    }
346
347    /// Handler for `eth_newPendingTransactionFilter`
348    async fn new_pending_transaction_filter(
349        &self,
350        kind: Option<PendingTransactionFilterKind>,
351    ) -> RpcResult<FilterId> {
352        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
353
354        let transaction_kind = match kind.unwrap_or_default() {
355            PendingTransactionFilterKind::Hashes => {
356                let receiver = self.pool().pending_transactions_listener();
357                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
358                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
359            }
360            PendingTransactionFilterKind::Full => {
361                let stream = self.pool().new_pending_pool_transactions_listener();
362                let full_txs_receiver = FullTransactionsReceiver::new(
363                    stream,
364                    dyn_clone::clone(self.inner.eth_api.converter()),
365                );
366                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
367                    full_txs_receiver,
368                )))
369            }
370        };
371
372        // Install the filter and propagate any errors
373        self.inner.install_filter(transaction_kind).await
374    }
375
376    /// Handler for `eth_getFilterChanges`
377    async fn filter_changes(
378        &self,
379        id: FilterId,
380    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
381        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
382        Ok(Self::filter_changes(self, id).await?)
383    }
384
385    /// Returns an array of all logs matching filter with given id.
386    ///
387    /// Returns an error if no matching log filter exists.
388    ///
389    /// Handler for `eth_getFilterLogs`
390    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
391        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
392        Ok(Self::filter_logs(self, id).await?)
393    }
394
395    /// Handler for `eth_uninstallFilter`
396    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
397        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
398        let mut filters = self.inner.active_filters.inner.lock().await;
399        if filters.remove(&id).is_some() {
400            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
401            Ok(true)
402        } else {
403            Ok(false)
404        }
405    }
406
407    /// Returns logs matching given filter object.
408    ///
409    /// Handler for `eth_getLogs`
410    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
411        trace!(target: "rpc::eth", "Serving eth_getLogs");
412        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
413    }
414}
415
416impl<Eth> std::fmt::Debug for EthFilter<Eth>
417where
418    Eth: EthApiTypes,
419{
420    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421        f.debug_struct("EthFilter").finish_non_exhaustive()
422    }
423}
424
425/// Container type `EthFilter`
426#[derive(Debug)]
427struct EthFilterInner<Eth: EthApiTypes> {
428    /// Inner `eth` API implementation.
429    eth_api: Eth,
430    /// All currently installed filters.
431    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
432    /// Provides ids to identify filters
433    id_provider: Arc<dyn IdProvider>,
434    /// limits for logs queries
435    query_limits: QueryLimits,
436    /// maximum number of headers to read at once for range filter
437    max_headers_range: u64,
438    /// The type that can spawn tasks.
439    task_spawner: Box<dyn TaskSpawner>,
440    /// Duration since the last filter poll, after which the filter is considered stale
441    stale_filter_ttl: Duration,
442}
443
444impl<Eth> EthFilterInner<Eth>
445where
446    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
447        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
448        + LoadReceipt
449        + EthBlocks
450        + 'static,
451{
452    /// Access the underlying provider.
453    fn provider(&self) -> &Eth::Provider {
454        self.eth_api.provider()
455    }
456
457    /// Access the underlying [`EthStateCache`].
458    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
459        self.eth_api.cache()
460    }
461
462    /// Returns logs matching given filter object.
463    async fn logs_for_filter(
464        self: Arc<Self>,
465        filter: Filter,
466        limits: QueryLimits,
467    ) -> Result<Vec<Log>, EthFilterError> {
468        match filter.block_option {
469            FilterBlockOption::AtBlockHash(block_hash) => {
470                // First try to get cached block and receipts, as it's likely they're already cached
471                let Some((receipts, maybe_block)) =
472                    self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
473                else {
474                    return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
475                };
476
477                // Get header - from cached block if available, otherwise from provider
478                let header = if let Some(block) = &maybe_block {
479                    block.header().clone()
480                } else {
481                    self.provider()
482                        .header_by_hash_or_number(block_hash.into())?
483                        .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?
484                };
485
486                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
487
488                let mut all_logs = Vec::new();
489                append_matching_block_logs(
490                    &mut all_logs,
491                    maybe_block
492                        .map(ProviderOrBlock::Block)
493                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
494                    &filter,
495                    block_num_hash,
496                    &receipts,
497                    false,
498                    header.timestamp(),
499                )?;
500
501                Ok(all_logs)
502            }
503            FilterBlockOption::Range { from_block, to_block } => {
504                // Handle special case where from block is pending
505                if from_block.is_some_and(|b| b.is_pending()) {
506                    let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
507                    if !(to_block.is_pending() || to_block.is_number()) {
508                        // always empty range
509                        return Ok(Vec::new());
510                    }
511                    // Try to get pending block and receipts
512                    if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
513                        if let BlockNumberOrTag::Number(to_block) = to_block &&
514                            to_block < pending_block.block.number()
515                        {
516                            // this block range is empty based on the user input
517                            return Ok(Vec::new());
518                        }
519
520                        let info = self.provider().chain_info()?;
521                        if pending_block.block.number() > info.best_number {
522                            // only consider the pending block if it is ahead of the chain
523                            let mut all_logs = Vec::new();
524                            let timestamp = pending_block.block.timestamp();
525                            let block_num_hash = pending_block.block.num_hash();
526                            append_matching_block_logs(
527                                &mut all_logs,
528                                ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
529                                &filter,
530                                block_num_hash,
531                                &pending_block.receipts,
532                                false, // removed = false for pending blocks
533                                timestamp,
534                            )?;
535                            return Ok(all_logs);
536                        }
537                    }
538                }
539
540                let info = self.provider().chain_info()?;
541                let start_block = info.best_number;
542                let from = from_block
543                    .map(|num| self.provider().convert_block_number(num))
544                    .transpose()?
545                    .flatten();
546                let to = to_block
547                    .map(|num| self.provider().convert_block_number(num))
548                    .transpose()?
549                    .flatten();
550
551                // Return error if toBlock exceeds current head
552                if let Some(t) = to &&
553                    t > info.best_number
554                {
555                    return Err(EthFilterError::BlockRangeExceedsHead);
556                }
557
558                if let Some(f) = from &&
559                    f > info.best_number
560                {
561                    // start block higher than local head, can return empty
562                    return Ok(Vec::new());
563                }
564
565                let (from_block_number, to_block_number) =
566                    logs_utils::get_filter_block_range(from, to, start_block, info)?;
567
568                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
569                    .await
570            }
571        }
572    }
573
574    /// Installs a new filter and returns the new identifier.
575    async fn install_filter(
576        &self,
577        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
578    ) -> RpcResult<FilterId> {
579        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
580        let subscription_id = self.id_provider.next_id();
581
582        let id = match subscription_id {
583            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
584            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
585        };
586        let mut filters = self.active_filters.inner.lock().await;
587        filters.insert(
588            id.clone(),
589            ActiveFilter {
590                block: last_poll_block_number,
591                last_poll_timestamp: Instant::now(),
592                kind,
593            },
594        );
595        Ok(id)
596    }
597
598    /// Returns all logs in the given _inclusive_ range that match the filter
599    ///
600    /// Returns an error if:
601    ///  - underlying database error
602    ///  - amount of matches exceeds configured limit
603    async fn get_logs_in_block_range(
604        self: Arc<Self>,
605        filter: Filter,
606        from_block: u64,
607        to_block: u64,
608        limits: QueryLimits,
609    ) -> Result<Vec<Log>, EthFilterError> {
610        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
611
612        // perform boundary checks first
613        if to_block < from_block {
614            return Err(EthFilterError::InvalidBlockRangeParams)
615        }
616
617        if let Some(max_blocks_per_filter) =
618            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
619        {
620            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
621        }
622
623        let (tx, rx) = oneshot::channel();
624        let this = self.clone();
625        self.task_spawner.spawn_blocking(Box::pin(async move {
626            let res =
627                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
628            let _ = tx.send(res);
629        }));
630
631        rx.await.map_err(|_| EthFilterError::InternalError)?
632    }
633
634    /// Returns all logs in the given _inclusive_ range that match the filter
635    ///
636    /// Note: This function uses a mix of blocking db operations for fetching indices and header
637    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
638    /// This function is considered blocking and should thus be spawned on a blocking task.
639    ///
640    /// Returns an error if:
641    ///  - underlying database error
642    async fn get_logs_in_block_range_inner(
643        self: Arc<Self>,
644        filter: &Filter,
645        from_block: u64,
646        to_block: u64,
647        limits: QueryLimits,
648    ) -> Result<Vec<Log>, EthFilterError> {
649        let mut all_logs = Vec::new();
650        let mut matching_headers = Vec::new();
651
652        // get current chain tip to determine processing mode
653        let chain_tip = self.provider().best_block_number()?;
654
655        // first collect all headers that match the bloom filter for cached mode decision
656        for (from, to) in
657            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
658        {
659            let headers = self.provider().headers_range(from..=to)?;
660
661            let mut headers_iter = headers.into_iter().peekable();
662
663            while let Some(header) = headers_iter.next() {
664                if !filter.matches_bloom(header.logs_bloom()) {
665                    continue
666                }
667
668                let current_number = header.number();
669
670                let block_hash = match headers_iter.peek() {
671                    Some(next_header) if next_header.number() == current_number + 1 => {
672                        // Headers are consecutive, use the more efficient parent_hash
673                        next_header.parent_hash()
674                    }
675                    _ => {
676                        // Headers not consecutive or last header, calculate hash
677                        header.hash_slow()
678                    }
679                };
680
681                matching_headers.push(SealedHeader::new(header, block_hash));
682            }
683        }
684
685        // initialize the appropriate range mode based on collected headers
686        let mut range_mode = RangeMode::new(
687            self.clone(),
688            matching_headers,
689            from_block,
690            to_block,
691            self.max_headers_range,
692            chain_tip,
693        );
694
695        // iterate through the range mode to get receipts and blocks
696        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
697            range_mode.next().await?
698        {
699            let num_hash = header.num_hash();
700            append_matching_block_logs(
701                &mut all_logs,
702                recovered_block
703                    .map(ProviderOrBlock::Block)
704                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
705                filter,
706                num_hash,
707                &receipts,
708                false,
709                header.timestamp(),
710            )?;
711
712            // size check but only if range is multiple blocks, so we always return all
713            // logs of a single block
714            let is_multi_block_range = from_block != to_block;
715            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
716                is_multi_block_range &&
717                all_logs.len() > max_logs_per_response
718            {
719                debug!(
720                    target: "rpc::eth::filter",
721                    logs_found = all_logs.len(),
722                    max_logs_per_response,
723                    from_block,
724                    to_block = num_hash.number.saturating_sub(1),
725                    "Query exceeded max logs per response limit"
726                );
727                return Err(EthFilterError::QueryExceedsMaxResults {
728                    max_logs: max_logs_per_response,
729                    from_block,
730                    to_block: num_hash.number.saturating_sub(1),
731                });
732            }
733        }
734
735        Ok(all_logs)
736    }
737}
738
739/// All active filters
740#[derive(Debug, Clone, Default)]
741pub struct ActiveFilters<T> {
742    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
743}
744
745impl<T> ActiveFilters<T> {
746    /// Returns an empty instance.
747    pub fn new() -> Self {
748        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
749    }
750}
751
752/// An installed filter
753#[derive(Debug)]
754struct ActiveFilter<T> {
755    /// At which block the filter was polled last.
756    block: u64,
757    /// Last time this filter was polled.
758    last_poll_timestamp: Instant,
759    /// What kind of filter it is.
760    kind: FilterKind<T>,
761}
762
763/// A receiver for pending transactions that returns all new transactions since the last poll.
764#[derive(Debug, Clone)]
765struct PendingTransactionsReceiver {
766    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
767}
768
769impl PendingTransactionsReceiver {
770    fn new(receiver: Receiver<TxHash>) -> Self {
771        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
772    }
773
774    /// Returns all new pending transactions received since the last poll.
775    async fn drain<T>(&self) -> FilterChanges<T> {
776        let mut pending_txs = Vec::new();
777        let mut prepared_stream = self.txs_receiver.lock().await;
778
779        while let Ok(tx_hash) = prepared_stream.try_recv() {
780            pending_txs.push(tx_hash);
781        }
782
783        // Convert the vector of hashes into FilterChanges::Hashes
784        FilterChanges::Hashes(pending_txs)
785    }
786}
787
788/// A structure to manage and provide access to a stream of full transaction details.
789#[derive(Debug, Clone)]
790struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
791    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
792    converter: TxCompat,
793}
794
795impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
796where
797    T: PoolTransaction + 'static,
798    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
799{
800    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
801    fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
802        Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
803    }
804
805    /// Returns all new pending transactions received since the last poll.
806    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
807        let mut pending_txs = Vec::new();
808        let mut prepared_stream = self.txs_stream.lock().await;
809
810        while let Ok(tx) = prepared_stream.try_recv() {
811            match self.converter.fill_pending(tx.transaction.to_consensus()) {
812                Ok(tx) => pending_txs.push(tx),
813                Err(err) => {
814                    error!(target: "rpc",
815                        %err,
816                        "Failed to fill txn with block context"
817                    );
818                }
819            }
820        }
821        FilterChanges::Transactions(pending_txs)
822    }
823}
824
825/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
826#[async_trait]
827trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
828    async fn drain(&self) -> FilterChanges<T>;
829}
830
831#[async_trait]
832impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
833    for FullTransactionsReceiver<T, TxCompat>
834where
835    T: PoolTransaction + 'static,
836    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
837{
838    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
839        Self::drain(self).await
840    }
841}
842
843/// Represents the kind of pending transaction data that can be retrieved.
844///
845/// This enum differentiates between two kinds of pending transaction data:
846/// - Just the transaction hashes.
847/// - Full transaction details.
848#[derive(Debug, Clone)]
849enum PendingTransactionKind<T> {
850    Hashes(PendingTransactionsReceiver),
851    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
852}
853
854impl<T: 'static> PendingTransactionKind<T> {
855    async fn drain(&self) -> FilterChanges<T> {
856        match self {
857            Self::Hashes(receiver) => receiver.drain().await,
858            Self::FullTransaction(receiver) => receiver.drain().await,
859        }
860    }
861}
862
863#[derive(Clone, Debug)]
864enum FilterKind<T> {
865    Log(Box<Filter>),
866    Block,
867    PendingTransaction(PendingTransactionKind<T>),
868}
869
870/// An iterator that yields _inclusive_ block ranges of a given step size
871#[derive(Debug)]
872struct BlockRangeInclusiveIter {
873    iter: StepBy<RangeInclusive<u64>>,
874    step: u64,
875    end: u64,
876}
877
878impl BlockRangeInclusiveIter {
879    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
880        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
881    }
882}
883
884impl Iterator for BlockRangeInclusiveIter {
885    type Item = (u64, u64);
886
887    fn next(&mut self) -> Option<Self::Item> {
888        let start = self.iter.next()?;
889        let end = (start + self.step).min(self.end);
890        if start > end {
891            return None
892        }
893        Some((start, end))
894    }
895}
896
897/// Errors that can occur in the handler implementation
898#[derive(Debug, thiserror::Error)]
899pub enum EthFilterError {
900    /// Filter not found.
901    #[error("filter not found")]
902    FilterNotFound(FilterId),
903    /// Invalid block range.
904    #[error("invalid block range params")]
905    InvalidBlockRangeParams,
906    /// Block range extends beyond current head.
907    #[error("block range extends beyond current head block")]
908    BlockRangeExceedsHead,
909    /// Query scope is too broad.
910    #[error("query exceeds max block range {0}")]
911    QueryExceedsMaxBlocks(u64),
912    /// Query result is too large.
913    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
914    QueryExceedsMaxResults {
915        /// Maximum number of logs allowed per response
916        max_logs: usize,
917        /// Start block of the suggested retry range
918        from_block: u64,
919        /// End block of the suggested retry range (last successfully processed block)
920        to_block: u64,
921    },
922    /// Error serving request in `eth_` namespace.
923    #[error(transparent)]
924    EthAPIError(#[from] EthApiError),
925    /// Error thrown when a spawned task failed to deliver a response.
926    #[error("internal filter error")]
927    InternalError,
928}
929
930impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
931    fn from(err: EthFilterError) -> Self {
932        match err {
933            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
934                jsonrpsee::types::error::INVALID_PARAMS_CODE,
935                "filter not found",
936            ),
937            err @ EthFilterError::InternalError => {
938                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
939            }
940            EthFilterError::EthAPIError(err) => err.into(),
941            err @ (EthFilterError::InvalidBlockRangeParams |
942            EthFilterError::QueryExceedsMaxBlocks(_) |
943            EthFilterError::QueryExceedsMaxResults { .. } |
944            EthFilterError::BlockRangeExceedsHead) => {
945                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
946            }
947        }
948    }
949}
950
951impl From<ProviderError> for EthFilterError {
952    fn from(err: ProviderError) -> Self {
953        Self::EthAPIError(err.into())
954    }
955}
956
957impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
958    fn from(err: logs_utils::FilterBlockRangeError) -> Self {
959        match err {
960            logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
961            logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
962        }
963    }
964}
965
966/// Helper type for the common pattern of returning receipts, block and the original header that is
967/// a match for the filter.
968struct ReceiptBlockResult<P>
969where
970    P: ReceiptProvider + BlockReader,
971{
972    /// We always need the entire receipts for the matching block.
973    receipts: Arc<Vec<ProviderReceipt<P>>>,
974    /// Block can be optional and we can fetch it lazily when needed.
975    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
976    /// The header of the block.
977    header: SealedHeader<<P as HeaderProvider>::Header>,
978}
979
980/// Represents different modes for processing block ranges when filtering logs
981enum RangeMode<
982    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
983        + EthApiTypes
984        + LoadReceipt
985        + EthBlocks
986        + 'static,
987> {
988    /// Use cache-based processing for recent blocks
989    Cached(CachedMode<Eth>),
990    /// Use range-based processing for older blocks
991    Range(RangeBlockMode<Eth>),
992}
993
994impl<
995        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
996            + EthApiTypes
997            + LoadReceipt
998            + EthBlocks
999            + 'static,
1000    > RangeMode<Eth>
1001{
1002    /// Creates a new `RangeMode`.
1003    fn new(
1004        filter_inner: Arc<EthFilterInner<Eth>>,
1005        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1006        from_block: u64,
1007        to_block: u64,
1008        max_headers_range: u64,
1009        chain_tip: u64,
1010    ) -> Self {
1011        let block_count = to_block - from_block + 1;
1012        let distance_from_tip = chain_tip.saturating_sub(to_block);
1013
1014        // Determine if we should use cached mode based on range characteristics
1015        let use_cached_mode =
1016            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1017
1018        if use_cached_mode && !sealed_headers.is_empty() {
1019            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1020        } else {
1021            Self::Range(RangeBlockMode {
1022                filter_inner,
1023                iter: sealed_headers.into_iter().peekable(),
1024                next: VecDeque::new(),
1025                max_range: max_headers_range as usize,
1026                pending_tasks: FuturesOrdered::new(),
1027            })
1028        }
1029    }
1030
1031    /// Determines whether to use cached mode based on bloom filter matches and range size
1032    const fn should_use_cached_mode(
1033        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1034        block_count: u64,
1035        distance_from_tip: u64,
1036    ) -> bool {
1037        // Headers are already filtered by bloom, so count equals length
1038        let bloom_matches = headers.len();
1039
1040        // Calculate adjusted threshold based on bloom matches
1041        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1042
1043        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1044    }
1045
1046    /// Calculates the adjusted cache threshold based on bloom filter matches
1047    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1048        // Only apply adjustments for larger ranges
1049        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1050            return CACHED_MODE_BLOCK_THRESHOLD;
1051        }
1052
1053        match bloom_matches {
1054            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1055            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1056            _ => CACHED_MODE_BLOCK_THRESHOLD,
1057        }
1058    }
1059
1060    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
1061    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1062        match self {
1063            Self::Cached(cached) => cached.next().await,
1064            Self::Range(range) => range.next().await,
1065        }
1066    }
1067}
1068
1069/// Mode for processing blocks using cache optimization for recent blocks
1070struct CachedMode<
1071    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1072        + EthApiTypes
1073        + LoadReceipt
1074        + EthBlocks
1075        + 'static,
1076> {
1077    filter_inner: Arc<EthFilterInner<Eth>>,
1078    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1079}
1080
1081impl<
1082        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1083            + EthApiTypes
1084            + LoadReceipt
1085            + EthBlocks
1086            + 'static,
1087    > CachedMode<Eth>
1088{
1089    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1090        for header in self.headers_iter.by_ref() {
1091            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1092            if let Some((receipts, maybe_block)) =
1093                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1094            {
1095                return Ok(Some(ReceiptBlockResult {
1096                    receipts,
1097                    recovered_block: maybe_block,
1098                    header,
1099                }));
1100            }
1101        }
1102
1103        Ok(None) // No more headers
1104    }
1105}
1106
1107/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1108type ReceiptFetchFuture<P> =
1109    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1110
1111/// Mode for processing blocks using range queries for older blocks
1112struct RangeBlockMode<
1113    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1114        + EthApiTypes
1115        + LoadReceipt
1116        + EthBlocks
1117        + 'static,
1118> {
1119    filter_inner: Arc<EthFilterInner<Eth>>,
1120    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1121    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1122    max_range: usize,
1123    // Stream of ongoing receipt fetching tasks
1124    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1125}
1126
1127impl<
1128        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1129            + EthApiTypes
1130            + LoadReceipt
1131            + EthBlocks
1132            + 'static,
1133    > RangeBlockMode<Eth>
1134{
1135    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1136        loop {
1137            // First, try to return any already processed result from buffer
1138            if let Some(result) = self.next.pop_front() {
1139                return Ok(Some(result));
1140            }
1141
1142            // Try to get a completed task result if there are pending tasks
1143            if let Some(task_result) = self.pending_tasks.next().await {
1144                self.next.extend(task_result?);
1145                continue;
1146            }
1147
1148            // No pending tasks - try to generate more work
1149            let Some(next_header) = self.iter.next() else {
1150                // No more headers to process
1151                return Ok(None);
1152            };
1153
1154            let mut range_headers = Vec::with_capacity(self.max_range);
1155            range_headers.push(next_header);
1156
1157            // Collect consecutive blocks up to max_range size
1158            while range_headers.len() < self.max_range {
1159                let Some(peeked) = self.iter.peek() else { break };
1160                let Some(last_header) = range_headers.last() else { break };
1161
1162                let expected_next = last_header.number() + 1;
1163                if peeked.number() != expected_next {
1164                    trace!(
1165                        target: "rpc::eth::filter",
1166                        last_block = last_header.number(),
1167                        next_block = peeked.number(),
1168                        expected = expected_next,
1169                        range_size = range_headers.len(),
1170                        "Non-consecutive block detected, stopping range collection"
1171                    );
1172                    break; // Non-consecutive block, stop here
1173                }
1174
1175                let Some(next_header) = self.iter.next() else { break };
1176                range_headers.push(next_header);
1177            }
1178
1179            // Check if we should use parallel processing for large ranges
1180            let remaining_headers = self.iter.len() + range_headers.len();
1181            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1182                self.spawn_parallel_tasks(range_headers);
1183                // Continue loop to await the spawned tasks
1184            } else {
1185                // Process small range sequentially and add results to buffer
1186                if let Some(result) = self.process_small_range(range_headers).await? {
1187                    return Ok(Some(result));
1188                }
1189                // Continue loop to check for more work
1190            }
1191        }
1192    }
1193
1194    /// Process a small range of headers sequentially
1195    ///
1196    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1197    async fn process_small_range(
1198        &mut self,
1199        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1200    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1201        // Process each header individually to avoid queuing for all receipts
1202        for header in range_headers {
1203            // First check if already cached to avoid unnecessary provider calls
1204            let (maybe_block, maybe_receipts) = self
1205                .filter_inner
1206                .eth_cache()
1207                .maybe_cached_block_and_receipts(header.hash())
1208                .await?;
1209
1210            let receipts = match maybe_receipts {
1211                Some(receipts) => receipts,
1212                None => {
1213                    // Not cached - fetch directly from provider
1214                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1215                        Some(receipts) => Arc::new(receipts),
1216                        None => continue, // No receipts found
1217                    }
1218                }
1219            };
1220
1221            if !receipts.is_empty() {
1222                self.next.push_back(ReceiptBlockResult {
1223                    receipts,
1224                    recovered_block: maybe_block,
1225                    header,
1226                });
1227            }
1228        }
1229
1230        Ok(self.next.pop_front())
1231    }
1232
1233    /// Spawn parallel tasks for processing a large range of headers
1234    ///
1235    /// This is used when the remaining headers count is at or above
1236    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1237    fn spawn_parallel_tasks(
1238        &mut self,
1239        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1240    ) {
1241        // Split headers into chunks
1242        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1243        let header_chunks = range_headers
1244            .into_iter()
1245            .chunks(chunk_size)
1246            .into_iter()
1247            .map(|chunk| chunk.collect::<Vec<_>>())
1248            .collect::<Vec<_>>();
1249
1250        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1251        for chunk_headers in header_chunks {
1252            let filter_inner = self.filter_inner.clone();
1253            let chunk_task = Box::pin(async move {
1254                let chunk_task = tokio::task::spawn_blocking(move || {
1255                    let mut chunk_results = Vec::new();
1256
1257                    for header in chunk_headers {
1258                        // Fetch directly from provider - RangeMode is used for older blocks
1259                        // unlikely to be cached
1260                        let receipts = match filter_inner
1261                            .provider()
1262                            .receipts_by_block(header.hash().into())?
1263                        {
1264                            Some(receipts) => Arc::new(receipts),
1265                            None => continue, // No receipts found
1266                        };
1267
1268                        if !receipts.is_empty() {
1269                            chunk_results.push(ReceiptBlockResult {
1270                                receipts,
1271                                recovered_block: None,
1272                                header,
1273                            });
1274                        }
1275                    }
1276
1277                    Ok(chunk_results)
1278                });
1279
1280                // Await the blocking task and handle the result
1281                match chunk_task.await {
1282                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1283                    Ok(Err(e)) => Err(e),
1284                    Err(join_err) => {
1285                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1286                        Err(EthFilterError::InternalError)
1287                    }
1288                }
1289            });
1290
1291            self.pending_tasks.push_back(chunk_task);
1292        }
1293    }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use super::*;
1299    use crate::{eth::EthApi, EthApiBuilder};
1300    use alloy_network::Ethereum;
1301    use alloy_primitives::FixedBytes;
1302    use rand::Rng;
1303    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1304    use reth_ethereum_primitives::TxType;
1305    use reth_evm_ethereum::EthEvmConfig;
1306    use reth_network_api::noop::NoopNetwork;
1307    use reth_provider::test_utils::MockEthProvider;
1308    use reth_rpc_convert::RpcConverter;
1309    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1310    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1311    use reth_tasks::TokioTaskExecutor;
1312    use reth_testing_utils::generators;
1313    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1314    use std::{collections::VecDeque, sync::Arc};
1315
1316    #[test]
1317    fn test_block_range_iter() {
1318        let mut rng = generators::rng();
1319
1320        let start = rng.random::<u32>() as u64;
1321        let end = start.saturating_add(rng.random::<u32>() as u64);
1322        let step = rng.random::<u16>() as u64;
1323        let range = start..=end;
1324        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1325        let (from, mut end) = iter.next().unwrap();
1326        assert_eq!(from, start);
1327        assert_eq!(end, (from + step).min(*range.end()));
1328
1329        for (next_from, next_end) in iter {
1330            // ensure range starts with previous end + 1
1331            assert_eq!(next_from, end + 1);
1332            end = next_end;
1333        }
1334
1335        assert_eq!(end, *range.end());
1336    }
1337
1338    // Helper function to create a test EthApi instance
1339    #[expect(clippy::type_complexity)]
1340    fn build_test_eth_api(
1341        provider: MockEthProvider,
1342    ) -> EthApi<
1343        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1344        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1345    > {
1346        EthApiBuilder::new(
1347            provider.clone(),
1348            testing_pool(),
1349            NoopNetwork::default(),
1350            EthEvmConfig::new(provider.chain_spec()),
1351        )
1352        .build()
1353    }
1354
1355    #[tokio::test]
1356    async fn test_range_block_mode_empty_range() {
1357        let provider = MockEthProvider::default();
1358        let eth_api = build_test_eth_api(provider);
1359
1360        let eth_filter = super::EthFilter::new(
1361            eth_api,
1362            EthFilterConfig::default(),
1363            Box::new(TokioTaskExecutor::default()),
1364        );
1365        let filter_inner = eth_filter.inner;
1366
1367        let headers = vec![];
1368        let max_range = 100;
1369
1370        let mut range_mode = RangeBlockMode {
1371            filter_inner,
1372            iter: headers.into_iter().peekable(),
1373            next: VecDeque::new(),
1374            max_range,
1375            pending_tasks: FuturesOrdered::new(),
1376        };
1377
1378        let result = range_mode.next().await;
1379        assert!(result.is_ok());
1380        assert!(result.unwrap().is_none());
1381    }
1382
1383    #[tokio::test]
1384    async fn test_range_block_mode_queued_results_priority() {
1385        let provider = MockEthProvider::default();
1386        let eth_api = build_test_eth_api(provider);
1387
1388        let eth_filter = super::EthFilter::new(
1389            eth_api,
1390            EthFilterConfig::default(),
1391            Box::new(TokioTaskExecutor::default()),
1392        );
1393        let filter_inner = eth_filter.inner;
1394
1395        let headers = vec![
1396            SealedHeader::new(
1397                alloy_consensus::Header { number: 100, ..Default::default() },
1398                FixedBytes::random(),
1399            ),
1400            SealedHeader::new(
1401                alloy_consensus::Header { number: 101, ..Default::default() },
1402                FixedBytes::random(),
1403            ),
1404        ];
1405
1406        // create specific mock results to test ordering
1407        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1408        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1409
1410        // create mock receipts to test receipt handling
1411        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1412            tx_type: TxType::Legacy,
1413            cumulative_gas_used: 100_000,
1414            logs: vec![],
1415            success: true,
1416        };
1417        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1418            tx_type: TxType::Eip1559,
1419            cumulative_gas_used: 200_000,
1420            logs: vec![],
1421            success: true,
1422        };
1423        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1424            tx_type: TxType::Eip2930,
1425            cumulative_gas_used: 150_000,
1426            logs: vec![],
1427            success: false, // Different success status
1428        };
1429
1430        let mock_result_1 = ReceiptBlockResult {
1431            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1432            recovered_block: None,
1433            header: SealedHeader::new(
1434                alloy_consensus::Header { number: 42, ..Default::default() },
1435                expected_block_hash_1,
1436            ),
1437        };
1438
1439        let mock_result_2 = ReceiptBlockResult {
1440            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1441            recovered_block: None,
1442            header: SealedHeader::new(
1443                alloy_consensus::Header { number: 43, ..Default::default() },
1444                expected_block_hash_2,
1445            ),
1446        };
1447
1448        let mut range_mode = RangeBlockMode {
1449            filter_inner,
1450            iter: headers.into_iter().peekable(),
1451            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1452            max_range: 100,
1453            pending_tasks: FuturesOrdered::new(),
1454        };
1455
1456        // first call should return the first queued result (FIFO order)
1457        let result1 = range_mode.next().await;
1458        assert!(result1.is_ok());
1459        let receipt_result1 = result1.unwrap().unwrap();
1460        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1461        assert_eq!(receipt_result1.header.number, 42);
1462
1463        // verify receipts
1464        assert_eq!(receipt_result1.receipts.len(), 2);
1465        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1466        assert_eq!(
1467            receipt_result1.receipts[0].cumulative_gas_used,
1468            mock_receipt_1.cumulative_gas_used
1469        );
1470        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1471        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1472        assert_eq!(
1473            receipt_result1.receipts[1].cumulative_gas_used,
1474            mock_receipt_2.cumulative_gas_used
1475        );
1476        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1477
1478        // second call should return the second queued result
1479        let result2 = range_mode.next().await;
1480        assert!(result2.is_ok());
1481        let receipt_result2 = result2.unwrap().unwrap();
1482        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1483        assert_eq!(receipt_result2.header.number, 43);
1484
1485        // verify receipts
1486        assert_eq!(receipt_result2.receipts.len(), 1);
1487        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1488        assert_eq!(
1489            receipt_result2.receipts[0].cumulative_gas_used,
1490            mock_receipt_3.cumulative_gas_used
1491        );
1492        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1493
1494        // queue should now be empty
1495        assert!(range_mode.next.is_empty());
1496
1497        let result3 = range_mode.next().await;
1498        assert!(result3.is_ok());
1499    }
1500
1501    #[tokio::test]
1502    async fn test_range_block_mode_single_block_no_receipts() {
1503        let provider = MockEthProvider::default();
1504        let eth_api = build_test_eth_api(provider);
1505
1506        let eth_filter = super::EthFilter::new(
1507            eth_api,
1508            EthFilterConfig::default(),
1509            Box::new(TokioTaskExecutor::default()),
1510        );
1511        let filter_inner = eth_filter.inner;
1512
1513        let headers = vec![SealedHeader::new(
1514            alloy_consensus::Header { number: 100, ..Default::default() },
1515            FixedBytes::random(),
1516        )];
1517
1518        let mut range_mode = RangeBlockMode {
1519            filter_inner,
1520            iter: headers.into_iter().peekable(),
1521            next: VecDeque::new(),
1522            max_range: 100,
1523            pending_tasks: FuturesOrdered::new(),
1524        };
1525
1526        let result = range_mode.next().await;
1527        assert!(result.is_ok());
1528    }
1529
1530    #[tokio::test]
1531    async fn test_range_block_mode_provider_receipts() {
1532        let provider = MockEthProvider::default();
1533
1534        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1535        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1536        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1537
1538        let block_hash_1 = FixedBytes::random();
1539        let block_hash_2 = FixedBytes::random();
1540        let block_hash_3 = FixedBytes::random();
1541
1542        provider.add_header(block_hash_1, header_1.clone());
1543        provider.add_header(block_hash_2, header_2.clone());
1544        provider.add_header(block_hash_3, header_3.clone());
1545
1546        // create mock receipts to test provider fetching with mock logs
1547        let mock_log = alloy_primitives::Log {
1548            address: alloy_primitives::Address::ZERO,
1549            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1550        };
1551
1552        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1553            tx_type: TxType::Legacy,
1554            cumulative_gas_used: 21_000,
1555            logs: vec![mock_log.clone()],
1556            success: true,
1557        };
1558        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1559            tx_type: TxType::Eip1559,
1560            cumulative_gas_used: 42_000,
1561            logs: vec![mock_log.clone()],
1562            success: true,
1563        };
1564        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1565            tx_type: TxType::Eip2930,
1566            cumulative_gas_used: 30_000,
1567            logs: vec![mock_log.clone()],
1568            success: false,
1569        };
1570
1571        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1572        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1573
1574        let eth_api = build_test_eth_api(provider);
1575
1576        let eth_filter = super::EthFilter::new(
1577            eth_api,
1578            EthFilterConfig::default(),
1579            Box::new(TokioTaskExecutor::default()),
1580        );
1581        let filter_inner = eth_filter.inner;
1582
1583        let headers = vec![
1584            SealedHeader::new(header_1, block_hash_1),
1585            SealedHeader::new(header_2, block_hash_2),
1586            SealedHeader::new(header_3, block_hash_3),
1587        ];
1588
1589        let mut range_mode = RangeBlockMode {
1590            filter_inner,
1591            iter: headers.into_iter().peekable(),
1592            next: VecDeque::new(),
1593            max_range: 3, // include the 3 blocks in the first queried results
1594            pending_tasks: FuturesOrdered::new(),
1595        };
1596
1597        // first call should fetch receipts from provider and return first block with receipts
1598        let result = range_mode.next().await;
1599        assert!(result.is_ok());
1600        let receipt_result = result.unwrap().unwrap();
1601
1602        assert_eq!(receipt_result.header.hash(), block_hash_1);
1603        assert_eq!(receipt_result.header.number, 100);
1604        assert_eq!(receipt_result.receipts.len(), 2);
1605
1606        // verify receipts
1607        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1608        assert_eq!(
1609            receipt_result.receipts[0].cumulative_gas_used,
1610            receipt_100_1.cumulative_gas_used
1611        );
1612        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1613
1614        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1615        assert_eq!(
1616            receipt_result.receipts[1].cumulative_gas_used,
1617            receipt_100_2.cumulative_gas_used
1618        );
1619        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1620
1621        // second call should return the second block with receipts
1622        let result2 = range_mode.next().await;
1623        assert!(result2.is_ok());
1624        let receipt_result2 = result2.unwrap().unwrap();
1625
1626        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1627        assert_eq!(receipt_result2.header.number, 101);
1628        assert_eq!(receipt_result2.receipts.len(), 1);
1629
1630        // verify receipts
1631        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1632        assert_eq!(
1633            receipt_result2.receipts[0].cumulative_gas_used,
1634            receipt_101_1.cumulative_gas_used
1635        );
1636        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1637
1638        // third call should return None since no more blocks with receipts
1639        let result3 = range_mode.next().await;
1640        assert!(result3.is_ok());
1641        assert!(result3.unwrap().is_none());
1642    }
1643
1644    #[tokio::test]
1645    async fn test_range_block_mode_iterator_exhaustion() {
1646        let provider = MockEthProvider::default();
1647
1648        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1649        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1650
1651        let block_hash_100 = FixedBytes::random();
1652        let block_hash_101 = FixedBytes::random();
1653
1654        // Associate headers with hashes first
1655        provider.add_header(block_hash_100, header_100.clone());
1656        provider.add_header(block_hash_101, header_101.clone());
1657
1658        // Add mock receipts so headers are actually processed
1659        let mock_receipt = reth_ethereum_primitives::Receipt {
1660            tx_type: TxType::Legacy,
1661            cumulative_gas_used: 21_000,
1662            logs: vec![],
1663            success: true,
1664        };
1665        provider.add_receipts(100, vec![mock_receipt.clone()]);
1666        provider.add_receipts(101, vec![mock_receipt.clone()]);
1667
1668        let eth_api = build_test_eth_api(provider);
1669
1670        let eth_filter = super::EthFilter::new(
1671            eth_api,
1672            EthFilterConfig::default(),
1673            Box::new(TokioTaskExecutor::default()),
1674        );
1675        let filter_inner = eth_filter.inner;
1676
1677        let headers = vec![
1678            SealedHeader::new(header_100, block_hash_100),
1679            SealedHeader::new(header_101, block_hash_101),
1680        ];
1681
1682        let mut range_mode = RangeBlockMode {
1683            filter_inner,
1684            iter: headers.into_iter().peekable(),
1685            next: VecDeque::new(),
1686            max_range: 1,
1687            pending_tasks: FuturesOrdered::new(),
1688        };
1689
1690        let result1 = range_mode.next().await;
1691        assert!(result1.is_ok());
1692        assert!(result1.unwrap().is_some()); // Should have processed block 100
1693
1694        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1695
1696        let result2 = range_mode.next().await;
1697        assert!(result2.is_ok());
1698        assert!(result2.unwrap().is_some()); // Should have processed block 101
1699
1700        // now iterator should be exhausted
1701        assert!(range_mode.iter.peek().is_none());
1702
1703        // further calls should return None
1704        let result3 = range_mode.next().await;
1705        assert!(result3.is_ok());
1706        assert!(result3.unwrap().is_none());
1707    }
1708
1709    #[tokio::test]
1710    async fn test_cached_mode_with_mock_receipts() {
1711        // create test data
1712        let test_hash = FixedBytes::from([42u8; 32]);
1713        let test_block_number = 100u64;
1714        let test_header = SealedHeader::new(
1715            alloy_consensus::Header {
1716                number: test_block_number,
1717                gas_used: 50_000,
1718                ..Default::default()
1719            },
1720            test_hash,
1721        );
1722
1723        // add a mock receipt to the provider with a mock log
1724        let mock_log = alloy_primitives::Log {
1725            address: alloy_primitives::Address::ZERO,
1726            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1727        };
1728
1729        let mock_receipt = reth_ethereum_primitives::Receipt {
1730            tx_type: TxType::Legacy,
1731            cumulative_gas_used: 21_000,
1732            logs: vec![mock_log],
1733            success: true,
1734        };
1735
1736        let provider = MockEthProvider::default();
1737        provider.add_header(test_hash, test_header.header().clone());
1738        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1739
1740        let eth_api = build_test_eth_api(provider);
1741        let eth_filter = super::EthFilter::new(
1742            eth_api,
1743            EthFilterConfig::default(),
1744            Box::new(TokioTaskExecutor::default()),
1745        );
1746        let filter_inner = eth_filter.inner;
1747
1748        let headers = vec![test_header.clone()];
1749
1750        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1751
1752        // should find the receipt from provider fallback (cache will be empty)
1753        let result = cached_mode.next().await.expect("next should succeed");
1754        let receipt_block_result = result.expect("should have receipt result");
1755        assert_eq!(receipt_block_result.header.hash(), test_hash);
1756        assert_eq!(receipt_block_result.header.number, test_block_number);
1757        assert_eq!(receipt_block_result.receipts.len(), 1);
1758        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1759        assert_eq!(
1760            receipt_block_result.receipts[0].cumulative_gas_used,
1761            mock_receipt.cumulative_gas_used
1762        );
1763        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1764
1765        // iterator should be exhausted
1766        let result2 = cached_mode.next().await;
1767        assert!(result2.is_ok());
1768        assert!(result2.unwrap().is_none());
1769    }
1770
1771    #[tokio::test]
1772    async fn test_cached_mode_empty_headers() {
1773        let provider = MockEthProvider::default();
1774        let eth_api = build_test_eth_api(provider);
1775
1776        let eth_filter = super::EthFilter::new(
1777            eth_api,
1778            EthFilterConfig::default(),
1779            Box::new(TokioTaskExecutor::default()),
1780        );
1781        let filter_inner = eth_filter.inner;
1782
1783        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1784
1785        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1786
1787        // should immediately return None for empty headers
1788        let result = cached_mode.next().await.expect("next should succeed");
1789        assert!(result.is_none());
1790    }
1791
1792    #[tokio::test]
1793    async fn test_non_consecutive_headers_after_bloom_filter() {
1794        let provider = MockEthProvider::default();
1795
1796        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1797        let mut expected_hashes = vec![];
1798        let mut prev_hash = alloy_primitives::B256::default();
1799
1800        // Create a transaction for blocks that will have receipts
1801        use alloy_consensus::TxLegacy;
1802        use reth_ethereum_primitives::{TransactionSigned, TxType};
1803
1804        let tx_inner = TxLegacy {
1805            chain_id: Some(1),
1806            nonce: 0,
1807            gas_price: 21_000,
1808            gas_limit: 21_000,
1809            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1810            value: alloy_primitives::U256::ZERO,
1811            input: alloy_primitives::Bytes::new(),
1812        };
1813        let signature = alloy_primitives::Signature::test_signature();
1814        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1815
1816        for i in 100u64..=103 {
1817            let header = alloy_consensus::Header {
1818                number: i,
1819                parent_hash: prev_hash,
1820                // Set bloom to match filter only for blocks 100 and 102
1821                logs_bloom: if i == 100 || i == 102 {
1822                    alloy_primitives::Bloom::from([1u8; 256])
1823                } else {
1824                    alloy_primitives::Bloom::default()
1825                },
1826                ..Default::default()
1827            };
1828
1829            let hash = header.hash_slow();
1830            expected_hashes.push(hash);
1831            prev_hash = hash;
1832
1833            // Add transaction to blocks that will have receipts (100 and 102)
1834            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1835
1836            let block = reth_ethereum_primitives::Block {
1837                header,
1838                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1839            };
1840            provider.add_block(hash, block);
1841        }
1842
1843        // Add receipts with logs only to blocks that match bloom
1844        let mock_log = alloy_primitives::Log {
1845            address: alloy_primitives::Address::ZERO,
1846            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1847        };
1848
1849        let receipt = reth_ethereum_primitives::Receipt {
1850            tx_type: TxType::Legacy,
1851            cumulative_gas_used: 21_000,
1852            logs: vec![mock_log],
1853            success: true,
1854        };
1855
1856        provider.add_receipts(100, vec![receipt.clone()]);
1857        provider.add_receipts(101, vec![]);
1858        provider.add_receipts(102, vec![receipt.clone()]);
1859        provider.add_receipts(103, vec![]);
1860
1861        // Add block body indices for each block so receipts can be fetched
1862        use reth_db_api::models::StoredBlockBodyIndices;
1863        provider
1864            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1865        provider
1866            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1867        provider
1868            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1869        provider
1870            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1871
1872        let eth_api = build_test_eth_api(provider);
1873        let eth_filter = EthFilter::new(
1874            eth_api,
1875            EthFilterConfig::default(),
1876            Box::new(TokioTaskExecutor::default()),
1877        );
1878
1879        // Use default filter which will match any non-empty bloom
1880        let filter = Filter::default();
1881
1882        // Get logs in the range - this will trigger the bloom filtering
1883        let logs = eth_filter
1884            .inner
1885            .clone()
1886            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1887            .await
1888            .expect("should succeed");
1889
1890        // We should get logs from blocks 100 and 102 only (bloom filtered)
1891        assert_eq!(logs.len(), 2);
1892
1893        assert_eq!(logs[0].block_number, Some(100));
1894        assert_eq!(logs[1].block_number, Some(102));
1895
1896        // Each block hash should be the hash of its own header, not derived from any other header
1897        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1898        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1899    }
1900}