reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8    PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12    future::TryFutureExt,
13    stream::{FuturesOrdered, StreamExt},
14    Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21    helpers::{EthBlocks, LoadReceipt},
22    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23    RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32    ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37    collections::{HashMap, VecDeque},
38    fmt,
39    iter::{Peekable, StepBy},
40    ops::RangeInclusive,
41    pin::Pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    sync::{mpsc::Receiver, oneshot, Mutex},
47    time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53    Eth: FullEthApiTypes
54        + RpcNodeCoreExt<Provider: BlockIdReader>
55        + LoadReceipt
56        + EthBlocks
57        + 'static,
58{
59    /// Returns logs matching given filter object, no query limits
60    fn logs(
61        &self,
62        filter: Filter,
63        limits: QueryLimits,
64    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65        trace!(target: "rpc::eth", "Serving eth_getLogs");
66        self.logs_for_filter(filter, limits).map_err(|e| e.into())
67    }
68}
69
70/// Threshold for deciding between cached and range mode processing
71const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73/// Threshold for bloom filter matches that triggers reduced caching
74const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76/// Threshold for bloom filter matches that triggers moderately reduced caching
77const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79/// Minimum block count to apply bloom filter match adjustments
80const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82/// The maximum number of headers we read at once when handling a range filter.
83const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
84
85/// Threshold for enabling parallel processing in range mode
86const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88/// Default concurrency for parallel processing
89const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91/// `Eth` filter RPC implementation.
92///
93/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
94pub struct EthFilter<Eth: EthApiTypes> {
95    /// All nested fields bundled together
96    inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101    Eth: EthApiTypes,
102{
103    fn clone(&self) -> Self {
104        Self { inner: self.inner.clone() }
105    }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110    Eth: EthApiTypes + 'static,
111{
112    /// Creates a new, shareable instance.
113    ///
114    /// This uses the given pool to get notified about new transactions, the provider to interact
115    /// with the blockchain, the cache to fetch cacheable data, like the logs.
116    ///
117    /// See also [`EthFilterConfig`].
118    ///
119    /// This also spawns a task that periodically clears stale filters.
120    ///
121    /// # Create a new instance with [`EthApi`](crate::EthApi)
122    ///
123    /// ```no_run
124    /// use reth_evm_ethereum::EthEvmConfig;
125    /// use reth_network_api::noop::NoopNetwork;
126    /// use reth_provider::noop::NoopProvider;
127    /// use reth_rpc::{EthApi, EthFilter};
128    /// use reth_tasks::TokioTaskExecutor;
129    /// use reth_transaction_pool::noop::NoopTransactionPool;
130    /// let eth_api = EthApi::builder(
131    ///     NoopProvider::default(),
132    ///     NoopTransactionPool::default(),
133    ///     NoopNetwork::default(),
134    ///     EthEvmConfig::mainnet(),
135    /// )
136    /// .build();
137    /// let filter = EthFilter::new(eth_api, Default::default(), TokioTaskExecutor::default().boxed());
138    /// ```
139    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
140        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141            config;
142        let inner = EthFilterInner {
143            eth_api,
144            active_filters: ActiveFilters::new(),
145            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146            max_headers_range: MAX_HEADERS_RANGE,
147            task_spawner,
148            stale_filter_ttl,
149            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150        };
151
152        let eth_filter = Self { inner: Arc::new(inner) };
153
154        let this = eth_filter.clone();
155        eth_filter.inner.task_spawner.spawn_critical(
156            "eth-filters_stale-filters-clean",
157            Box::pin(async move {
158                this.watch_and_clear_stale_filters().await;
159            }),
160        );
161
162        eth_filter
163    }
164
165    /// Returns all currently active filters
166    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167        &self.inner.active_filters
168    }
169
170    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
171    /// Nonetheless, this endless future frees the thread at every await point.
172    async fn watch_and_clear_stale_filters(&self) {
173        let mut interval = tokio::time::interval_at(
174            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175            self.inner.stale_filter_ttl,
176        );
177        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178        loop {
179            interval.tick().await;
180            self.clear_stale_filters(Instant::now()).await;
181        }
182    }
183
184    /// Clears all filters that have not been polled for longer than the configured
185    /// `stale_filter_ttl` at the given instant.
186    pub async fn clear_stale_filters(&self, now: Instant) {
187        trace!(target: "rpc::eth", "clear stale filters");
188        let mut filters = self.active_filters().inner.lock().await;
189        filters.retain(|id, filter| {
190            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192            if !is_valid {
193                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194            }
195
196            is_valid
197        });
198        filters.shrink_to_fit();
199    }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205        + RpcNodeCoreExt
206        + LoadReceipt
207        + EthBlocks
208        + 'static,
209{
210    /// Access the underlying provider.
211    fn provider(&self) -> &Eth::Provider {
212        self.inner.eth_api.provider()
213    }
214
215    /// Access the underlying pool.
216    fn pool(&self) -> &Eth::Pool {
217        self.inner.eth_api.pool()
218    }
219
220    /// Returns all the filter changes for the given id, if any
221    pub async fn filter_changes(
222        &self,
223        id: FilterId,
224    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225        let info = self.provider().chain_info()?;
226        let best_number = info.best_number;
227
228        // start_block is the block from which we should start fetching changes, the next block from
229        // the last time changes were polled, in other words the best block at last poll + 1
230        let (start_block, kind) = {
231            let mut filters = self.inner.active_filters.inner.lock().await;
232            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234            if filter.block > best_number {
235                // no new blocks since the last poll
236                return Ok(FilterChanges::Empty)
237            }
238
239            // update filter
240            // we fetch all changes from [filter.block..best_block], so we advance the filter's
241            // block to `best_block +1`, the next from which we should start fetching changes again
242            let mut block = best_number + 1;
243            std::mem::swap(&mut filter.block, &mut block);
244            filter.last_poll_timestamp = Instant::now();
245
246            (block, filter.kind.clone())
247        };
248
249        match kind {
250            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251            FilterKind::Block => {
252                // Note: we need to fetch the block hashes from inclusive range
253                // [start_block..best_block]
254                let end_block = best_number + 1;
255                let block_hashes =
256                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
257                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258                    )?;
259                Ok(FilterChanges::Hashes(block_hashes))
260            }
261            FilterKind::Log(filter) => {
262                let (from_block_number, to_block_number) = match filter.block_option {
263                    FilterBlockOption::Range { from_block, to_block } => {
264                        let from = from_block
265                            .map(|num| self.provider().convert_block_number(num))
266                            .transpose()?
267                            .flatten();
268                        let to = to_block
269                            .map(|num| self.provider().convert_block_number(num))
270                            .transpose()?
271                            .flatten();
272                        logs_utils::get_filter_block_range(from, to, start_block, info)?
273                    }
274                    FilterBlockOption::AtBlockHash(_) => {
275                        // blockHash is equivalent to fromBlock = toBlock = the block number with
276                        // hash blockHash
277                        // get_logs_in_block_range is inclusive
278                        (start_block, best_number)
279                    }
280                };
281                let logs = self
282                    .inner
283                    .clone()
284                    .get_logs_in_block_range(
285                        *filter,
286                        from_block_number,
287                        to_block_number,
288                        self.inner.query_limits,
289                    )
290                    .await?;
291                Ok(FilterChanges::Logs(logs))
292            }
293        }
294    }
295
296    /// Returns an array of all logs matching filter with given id.
297    ///
298    /// Returns an error if no matching log filter exists.
299    ///
300    /// Handler for `eth_getFilterLogs`
301    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
302        let filter = {
303            let mut filters = self.inner.active_filters.inner.lock().await;
304            let filter =
305                filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
306            if let FilterKind::Log(ref inner_filter) = filter.kind {
307                filter.last_poll_timestamp = Instant::now();
308                *inner_filter.clone()
309            } else {
310                // Not a log filter
311                return Err(EthFilterError::FilterNotFound(id))
312            }
313        };
314
315        self.logs_for_filter(filter, self.inner.query_limits).await
316    }
317
318    /// Returns logs matching given filter object.
319    async fn logs_for_filter(
320        &self,
321        filter: Filter,
322        limits: QueryLimits,
323    ) -> Result<Vec<Log>, EthFilterError> {
324        self.inner.clone().logs_for_filter(filter, limits).await
325    }
326}
327
328#[async_trait]
329impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
330where
331    Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
332{
333    /// Handler for `eth_newFilter`
334    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
335        trace!(target: "rpc::eth", "Serving eth_newFilter");
336        self.inner
337            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
338            .await
339    }
340
341    /// Handler for `eth_newBlockFilter`
342    async fn new_block_filter(&self) -> RpcResult<FilterId> {
343        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
344        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
345    }
346
347    /// Handler for `eth_newPendingTransactionFilter`
348    async fn new_pending_transaction_filter(
349        &self,
350        kind: Option<PendingTransactionFilterKind>,
351    ) -> RpcResult<FilterId> {
352        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
353
354        let transaction_kind = match kind.unwrap_or_default() {
355            PendingTransactionFilterKind::Hashes => {
356                let receiver = self.pool().pending_transactions_listener();
357                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
358                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
359            }
360            PendingTransactionFilterKind::Full => {
361                let stream = self.pool().new_pending_pool_transactions_listener();
362                let full_txs_receiver = FullTransactionsReceiver::new(
363                    stream,
364                    dyn_clone::clone(self.inner.eth_api.converter()),
365                );
366                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
367                    full_txs_receiver,
368                )))
369            }
370        };
371
372        // Install the filter and propagate any errors
373        self.inner.install_filter(transaction_kind).await
374    }
375
376    /// Handler for `eth_getFilterChanges`
377    async fn filter_changes(
378        &self,
379        id: FilterId,
380    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
381        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
382        Ok(Self::filter_changes(self, id).await?)
383    }
384
385    /// Returns an array of all logs matching filter with given id.
386    ///
387    /// Returns an error if no matching log filter exists.
388    ///
389    /// Handler for `eth_getFilterLogs`
390    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
391        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
392        Ok(Self::filter_logs(self, id).await?)
393    }
394
395    /// Handler for `eth_uninstallFilter`
396    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
397        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
398        let mut filters = self.inner.active_filters.inner.lock().await;
399        if filters.remove(&id).is_some() {
400            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
401            Ok(true)
402        } else {
403            Ok(false)
404        }
405    }
406
407    /// Returns logs matching given filter object.
408    ///
409    /// Handler for `eth_getLogs`
410    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
411        trace!(target: "rpc::eth", "Serving eth_getLogs");
412        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
413    }
414}
415
416impl<Eth> std::fmt::Debug for EthFilter<Eth>
417where
418    Eth: EthApiTypes,
419{
420    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421        f.debug_struct("EthFilter").finish_non_exhaustive()
422    }
423}
424
425/// Container type `EthFilter`
426#[derive(Debug)]
427struct EthFilterInner<Eth: EthApiTypes> {
428    /// Inner `eth` API implementation.
429    eth_api: Eth,
430    /// All currently installed filters.
431    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
432    /// Provides ids to identify filters
433    id_provider: Arc<dyn IdProvider>,
434    /// limits for logs queries
435    query_limits: QueryLimits,
436    /// maximum number of headers to read at once for range filter
437    max_headers_range: u64,
438    /// The type that can spawn tasks.
439    task_spawner: Box<dyn TaskSpawner>,
440    /// Duration since the last filter poll, after which the filter is considered stale
441    stale_filter_ttl: Duration,
442}
443
444impl<Eth> EthFilterInner<Eth>
445where
446    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
447        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
448        + LoadReceipt
449        + EthBlocks
450        + 'static,
451{
452    /// Access the underlying provider.
453    fn provider(&self) -> &Eth::Provider {
454        self.eth_api.provider()
455    }
456
457    /// Access the underlying [`EthStateCache`].
458    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
459        self.eth_api.cache()
460    }
461
462    /// Returns logs matching given filter object.
463    async fn logs_for_filter(
464        self: Arc<Self>,
465        filter: Filter,
466        limits: QueryLimits,
467    ) -> Result<Vec<Log>, EthFilterError> {
468        match filter.block_option {
469            FilterBlockOption::AtBlockHash(block_hash) => {
470                // First try to get cached block and receipts, as it's likely they're already cached
471                let Some((receipts, maybe_block)) =
472                    self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
473                else {
474                    return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
475                };
476
477                // Get header - from cached block if available, otherwise from provider
478                let header = if let Some(block) = &maybe_block {
479                    block.header().clone()
480                } else {
481                    self.provider()
482                        .header_by_hash_or_number(block_hash.into())?
483                        .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?
484                };
485
486                // Check if the block has been pruned (EIP-4444)
487                let earliest_block = self.provider().earliest_block_number()?;
488                if header.number() < earliest_block {
489                    return Err(EthApiError::PrunedHistoryUnavailable.into());
490                }
491
492                let block_num_hash = BlockNumHash::new(header.number(), block_hash);
493
494                let mut all_logs = Vec::new();
495                append_matching_block_logs(
496                    &mut all_logs,
497                    maybe_block
498                        .map(ProviderOrBlock::Block)
499                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
500                    &filter,
501                    block_num_hash,
502                    &receipts,
503                    false,
504                    header.timestamp(),
505                )?;
506
507                Ok(all_logs)
508            }
509            FilterBlockOption::Range { from_block, to_block } => {
510                // Handle special case where from block is pending
511                if from_block.is_some_and(|b| b.is_pending()) {
512                    let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
513                    if !(to_block.is_pending() || to_block.is_number()) {
514                        // always empty range
515                        return Ok(Vec::new());
516                    }
517                    // Try to get pending block and receipts
518                    if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
519                        if let BlockNumberOrTag::Number(to_block) = to_block &&
520                            to_block < pending_block.block.number()
521                        {
522                            // this block range is empty based on the user input
523                            return Ok(Vec::new());
524                        }
525
526                        let info = self.provider().chain_info()?;
527                        if pending_block.block.number() > info.best_number {
528                            // only consider the pending block if it is ahead of the chain
529                            let mut all_logs = Vec::new();
530                            let timestamp = pending_block.block.timestamp();
531                            let block_num_hash = pending_block.block.num_hash();
532                            append_matching_block_logs(
533                                &mut all_logs,
534                                ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
535                                &filter,
536                                block_num_hash,
537                                &pending_block.receipts,
538                                false, // removed = false for pending blocks
539                                timestamp,
540                            )?;
541                            return Ok(all_logs);
542                        }
543                    }
544                }
545
546                let info = self.provider().chain_info()?;
547                let start_block = info.best_number;
548                let from = from_block
549                    .map(|num| self.provider().convert_block_number(num))
550                    .transpose()?
551                    .flatten();
552                let to = to_block
553                    .map(|num| self.provider().convert_block_number(num))
554                    .transpose()?
555                    .flatten();
556
557                // Return error if toBlock exceeds current head
558                if let Some(t) = to &&
559                    t > info.best_number
560                {
561                    return Err(EthFilterError::BlockRangeExceedsHead);
562                }
563
564                if let Some(f) = from &&
565                    f > info.best_number
566                {
567                    // start block higher than local head, can return empty
568                    return Ok(Vec::new());
569                }
570
571                let (from_block_number, to_block_number) =
572                    logs_utils::get_filter_block_range(from, to, start_block, info)?;
573
574                // Check if the requested range overlaps with pruned history (EIP-4444)
575                let earliest_block = self.provider().earliest_block_number()?;
576                if from_block_number < earliest_block {
577                    return Err(EthApiError::PrunedHistoryUnavailable.into());
578                }
579
580                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
581                    .await
582            }
583        }
584    }
585
586    /// Installs a new filter and returns the new identifier.
587    async fn install_filter(
588        &self,
589        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
590    ) -> RpcResult<FilterId> {
591        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
592        let subscription_id = self.id_provider.next_id();
593
594        let id = match subscription_id {
595            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
596            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
597        };
598        let mut filters = self.active_filters.inner.lock().await;
599        filters.insert(
600            id.clone(),
601            ActiveFilter {
602                block: last_poll_block_number,
603                last_poll_timestamp: Instant::now(),
604                kind,
605            },
606        );
607        Ok(id)
608    }
609
610    /// Returns all logs in the given _inclusive_ range that match the filter
611    ///
612    /// Returns an error if:
613    ///  - underlying database error
614    ///  - amount of matches exceeds configured limit
615    async fn get_logs_in_block_range(
616        self: Arc<Self>,
617        filter: Filter,
618        from_block: u64,
619        to_block: u64,
620        limits: QueryLimits,
621    ) -> Result<Vec<Log>, EthFilterError> {
622        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
623
624        // perform boundary checks first
625        if to_block < from_block {
626            return Err(EthFilterError::InvalidBlockRangeParams)
627        }
628
629        if let Some(max_blocks_per_filter) =
630            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
631        {
632            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
633        }
634
635        let (tx, rx) = oneshot::channel();
636        let this = self.clone();
637        self.task_spawner.spawn_blocking(Box::pin(async move {
638            let res =
639                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
640            let _ = tx.send(res);
641        }));
642
643        rx.await.map_err(|_| EthFilterError::InternalError)?
644    }
645
646    /// Returns all logs in the given _inclusive_ range that match the filter
647    ///
648    /// Note: This function uses a mix of blocking db operations for fetching indices and header
649    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
650    /// This function is considered blocking and should thus be spawned on a blocking task.
651    ///
652    /// Returns an error if:
653    ///  - underlying database error
654    async fn get_logs_in_block_range_inner(
655        self: Arc<Self>,
656        filter: &Filter,
657        from_block: u64,
658        to_block: u64,
659        limits: QueryLimits,
660    ) -> Result<Vec<Log>, EthFilterError> {
661        let mut all_logs = Vec::new();
662        let mut matching_headers = Vec::new();
663
664        // get current chain tip to determine processing mode
665        let chain_tip = self.provider().best_block_number()?;
666
667        // first collect all headers that match the bloom filter for cached mode decision
668        for (from, to) in
669            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
670        {
671            let headers = self.provider().headers_range(from..=to)?;
672
673            let mut headers_iter = headers.into_iter().peekable();
674
675            while let Some(header) = headers_iter.next() {
676                if !filter.matches_bloom(header.logs_bloom()) {
677                    continue
678                }
679
680                let current_number = header.number();
681
682                let block_hash = match headers_iter.peek() {
683                    Some(next_header) if next_header.number() == current_number + 1 => {
684                        // Headers are consecutive, use the more efficient parent_hash
685                        next_header.parent_hash()
686                    }
687                    _ => {
688                        // Headers not consecutive or last header, calculate hash
689                        header.hash_slow()
690                    }
691                };
692
693                matching_headers.push(SealedHeader::new(header, block_hash));
694            }
695        }
696
697        // initialize the appropriate range mode based on collected headers
698        let mut range_mode = RangeMode::new(
699            self.clone(),
700            matching_headers,
701            from_block,
702            to_block,
703            self.max_headers_range,
704            chain_tip,
705        );
706
707        // iterate through the range mode to get receipts and blocks
708        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
709            range_mode.next().await?
710        {
711            let num_hash = header.num_hash();
712            append_matching_block_logs(
713                &mut all_logs,
714                recovered_block
715                    .map(ProviderOrBlock::Block)
716                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
717                filter,
718                num_hash,
719                &receipts,
720                false,
721                header.timestamp(),
722            )?;
723
724            // size check but only if range is multiple blocks, so we always return all
725            // logs of a single block
726            let is_multi_block_range = from_block != to_block;
727            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
728                is_multi_block_range &&
729                all_logs.len() > max_logs_per_response
730            {
731                debug!(
732                    target: "rpc::eth::filter",
733                    logs_found = all_logs.len(),
734                    max_logs_per_response,
735                    from_block,
736                    to_block = num_hash.number,
737                    "Query exceeded max logs per response limit"
738                );
739                return Err(EthFilterError::QueryExceedsMaxResults {
740                    max_logs: max_logs_per_response,
741                    from_block,
742                    to_block: num_hash.number,
743                });
744            }
745        }
746
747        Ok(all_logs)
748    }
749}
750
751/// All active filters
752#[derive(Debug, Clone, Default)]
753pub struct ActiveFilters<T> {
754    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
755}
756
757impl<T> ActiveFilters<T> {
758    /// Returns an empty instance.
759    pub fn new() -> Self {
760        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
761    }
762}
763
764/// An installed filter
765#[derive(Debug)]
766struct ActiveFilter<T> {
767    /// At which block the filter was polled last.
768    block: u64,
769    /// Last time this filter was polled.
770    last_poll_timestamp: Instant,
771    /// What kind of filter it is.
772    kind: FilterKind<T>,
773}
774
775/// A receiver for pending transactions that returns all new transactions since the last poll.
776#[derive(Debug, Clone)]
777struct PendingTransactionsReceiver {
778    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
779}
780
781impl PendingTransactionsReceiver {
782    fn new(receiver: Receiver<TxHash>) -> Self {
783        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
784    }
785
786    /// Returns all new pending transactions received since the last poll.
787    async fn drain<T>(&self) -> FilterChanges<T> {
788        let mut pending_txs = Vec::new();
789        let mut prepared_stream = self.txs_receiver.lock().await;
790
791        while let Ok(tx_hash) = prepared_stream.try_recv() {
792            pending_txs.push(tx_hash);
793        }
794
795        // Convert the vector of hashes into FilterChanges::Hashes
796        FilterChanges::Hashes(pending_txs)
797    }
798}
799
800/// A structure to manage and provide access to a stream of full transaction details.
801#[derive(Debug, Clone)]
802struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
803    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
804    converter: TxCompat,
805}
806
807impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
808where
809    T: PoolTransaction + 'static,
810    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
811{
812    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
813    fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
814        Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
815    }
816
817    /// Returns all new pending transactions received since the last poll.
818    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
819        let mut pending_txs = Vec::new();
820        let mut prepared_stream = self.txs_stream.lock().await;
821
822        while let Ok(tx) = prepared_stream.try_recv() {
823            match self.converter.fill_pending(tx.transaction.to_consensus()) {
824                Ok(tx) => pending_txs.push(tx),
825                Err(err) => {
826                    error!(target: "rpc",
827                        %err,
828                        "Failed to fill txn with block context"
829                    );
830                }
831            }
832        }
833        FilterChanges::Transactions(pending_txs)
834    }
835}
836
837/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
838#[async_trait]
839trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
840    async fn drain(&self) -> FilterChanges<T>;
841}
842
843#[async_trait]
844impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
845    for FullTransactionsReceiver<T, TxCompat>
846where
847    T: PoolTransaction + 'static,
848    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
849{
850    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
851        Self::drain(self).await
852    }
853}
854
855/// Represents the kind of pending transaction data that can be retrieved.
856///
857/// This enum differentiates between two kinds of pending transaction data:
858/// - Just the transaction hashes.
859/// - Full transaction details.
860#[derive(Debug, Clone)]
861enum PendingTransactionKind<T> {
862    Hashes(PendingTransactionsReceiver),
863    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
864}
865
866impl<T: 'static> PendingTransactionKind<T> {
867    async fn drain(&self) -> FilterChanges<T> {
868        match self {
869            Self::Hashes(receiver) => receiver.drain().await,
870            Self::FullTransaction(receiver) => receiver.drain().await,
871        }
872    }
873}
874
875#[derive(Clone, Debug)]
876enum FilterKind<T> {
877    Log(Box<Filter>),
878    Block,
879    PendingTransaction(PendingTransactionKind<T>),
880}
881
882/// An iterator that yields _inclusive_ block ranges of a given step size
883#[derive(Debug)]
884struct BlockRangeInclusiveIter {
885    iter: StepBy<RangeInclusive<u64>>,
886    step: u64,
887    end: u64,
888}
889
890impl BlockRangeInclusiveIter {
891    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
892        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
893    }
894}
895
896impl Iterator for BlockRangeInclusiveIter {
897    type Item = (u64, u64);
898
899    fn next(&mut self) -> Option<Self::Item> {
900        let start = self.iter.next()?;
901        let end = (start + self.step).min(self.end);
902        if start > end {
903            return None
904        }
905        Some((start, end))
906    }
907}
908
909/// Errors that can occur in the handler implementation
910#[derive(Debug, thiserror::Error)]
911pub enum EthFilterError {
912    /// Filter not found.
913    #[error("filter not found")]
914    FilterNotFound(FilterId),
915    /// Invalid block range.
916    #[error("invalid block range params")]
917    InvalidBlockRangeParams,
918    /// Block range extends beyond current head.
919    #[error("block range extends beyond current head block")]
920    BlockRangeExceedsHead,
921    /// Query scope is too broad.
922    #[error("query exceeds max block range {0}")]
923    QueryExceedsMaxBlocks(u64),
924    /// Query result is too large.
925    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
926    QueryExceedsMaxResults {
927        /// Maximum number of logs allowed per response
928        max_logs: usize,
929        /// Start block of the suggested retry range
930        from_block: u64,
931        /// End block of the suggested retry range (last successfully processed block)
932        to_block: u64,
933    },
934    /// Error serving request in `eth_` namespace.
935    #[error(transparent)]
936    EthAPIError(#[from] EthApiError),
937    /// Error thrown when a spawned task failed to deliver a response.
938    #[error("internal filter error")]
939    InternalError,
940}
941
942impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
943    fn from(err: EthFilterError) -> Self {
944        match err {
945            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
946                jsonrpsee::types::error::INVALID_PARAMS_CODE,
947                "filter not found",
948            ),
949            err @ EthFilterError::InternalError => {
950                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
951            }
952            EthFilterError::EthAPIError(err) => err.into(),
953            err @ (EthFilterError::InvalidBlockRangeParams |
954            EthFilterError::QueryExceedsMaxBlocks(_) |
955            EthFilterError::QueryExceedsMaxResults { .. } |
956            EthFilterError::BlockRangeExceedsHead) => {
957                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
958            }
959        }
960    }
961}
962
963impl From<ProviderError> for EthFilterError {
964    fn from(err: ProviderError) -> Self {
965        Self::EthAPIError(err.into())
966    }
967}
968
969impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
970    fn from(err: logs_utils::FilterBlockRangeError) -> Self {
971        match err {
972            logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
973            logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
974        }
975    }
976}
977
978/// Helper type for the common pattern of returning receipts, block and the original header that is
979/// a match for the filter.
980struct ReceiptBlockResult<P>
981where
982    P: ReceiptProvider + BlockReader,
983{
984    /// We always need the entire receipts for the matching block.
985    receipts: Arc<Vec<ProviderReceipt<P>>>,
986    /// Block can be optional and we can fetch it lazily when needed.
987    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
988    /// The header of the block.
989    header: SealedHeader<<P as HeaderProvider>::Header>,
990}
991
992/// Represents different modes for processing block ranges when filtering logs
993enum RangeMode<
994    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
995        + EthApiTypes
996        + LoadReceipt
997        + EthBlocks
998        + 'static,
999> {
1000    /// Use cache-based processing for recent blocks
1001    Cached(CachedMode<Eth>),
1002    /// Use range-based processing for older blocks
1003    Range(RangeBlockMode<Eth>),
1004}
1005
1006impl<
1007        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1008            + EthApiTypes
1009            + LoadReceipt
1010            + EthBlocks
1011            + 'static,
1012    > RangeMode<Eth>
1013{
1014    /// Creates a new `RangeMode`.
1015    fn new(
1016        filter_inner: Arc<EthFilterInner<Eth>>,
1017        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1018        from_block: u64,
1019        to_block: u64,
1020        max_headers_range: u64,
1021        chain_tip: u64,
1022    ) -> Self {
1023        let block_count = to_block - from_block + 1;
1024        let distance_from_tip = chain_tip.saturating_sub(to_block);
1025
1026        // Determine if we should use cached mode based on range characteristics
1027        let use_cached_mode =
1028            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1029
1030        if use_cached_mode && !sealed_headers.is_empty() {
1031            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1032        } else {
1033            Self::Range(RangeBlockMode {
1034                filter_inner,
1035                iter: sealed_headers.into_iter().peekable(),
1036                next: VecDeque::new(),
1037                max_range: max_headers_range as usize,
1038                pending_tasks: FuturesOrdered::new(),
1039            })
1040        }
1041    }
1042
1043    /// Determines whether to use cached mode based on bloom filter matches and range size
1044    const fn should_use_cached_mode(
1045        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1046        block_count: u64,
1047        distance_from_tip: u64,
1048    ) -> bool {
1049        // Headers are already filtered by bloom, so count equals length
1050        let bloom_matches = headers.len();
1051
1052        // Calculate adjusted threshold based on bloom matches
1053        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1054
1055        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1056    }
1057
1058    /// Calculates the adjusted cache threshold based on bloom filter matches
1059    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1060        // Only apply adjustments for larger ranges
1061        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1062            return CACHED_MODE_BLOCK_THRESHOLD;
1063        }
1064
1065        match bloom_matches {
1066            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1067            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1068            _ => CACHED_MODE_BLOCK_THRESHOLD,
1069        }
1070    }
1071
1072    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
1073    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1074        match self {
1075            Self::Cached(cached) => cached.next().await,
1076            Self::Range(range) => range.next().await,
1077        }
1078    }
1079}
1080
1081/// Mode for processing blocks using cache optimization for recent blocks
1082struct CachedMode<
1083    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1084        + EthApiTypes
1085        + LoadReceipt
1086        + EthBlocks
1087        + 'static,
1088> {
1089    filter_inner: Arc<EthFilterInner<Eth>>,
1090    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1091}
1092
1093impl<
1094        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1095            + EthApiTypes
1096            + LoadReceipt
1097            + EthBlocks
1098            + 'static,
1099    > CachedMode<Eth>
1100{
1101    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1102        for header in self.headers_iter.by_ref() {
1103            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1104            if let Some((receipts, maybe_block)) =
1105                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1106            {
1107                return Ok(Some(ReceiptBlockResult {
1108                    receipts,
1109                    recovered_block: maybe_block,
1110                    header,
1111                }));
1112            }
1113        }
1114
1115        Ok(None) // No more headers
1116    }
1117}
1118
1119/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1120type ReceiptFetchFuture<P> =
1121    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1122
1123/// Mode for processing blocks using range queries for older blocks
1124struct RangeBlockMode<
1125    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1126        + EthApiTypes
1127        + LoadReceipt
1128        + EthBlocks
1129        + 'static,
1130> {
1131    filter_inner: Arc<EthFilterInner<Eth>>,
1132    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1133    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1134    max_range: usize,
1135    // Stream of ongoing receipt fetching tasks
1136    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1137}
1138
1139impl<
1140        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1141            + EthApiTypes
1142            + LoadReceipt
1143            + EthBlocks
1144            + 'static,
1145    > RangeBlockMode<Eth>
1146{
1147    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1148        loop {
1149            // First, try to return any already processed result from buffer
1150            if let Some(result) = self.next.pop_front() {
1151                return Ok(Some(result));
1152            }
1153
1154            // Try to get a completed task result if there are pending tasks
1155            if let Some(task_result) = self.pending_tasks.next().await {
1156                self.next.extend(task_result?);
1157                continue;
1158            }
1159
1160            // No pending tasks - try to generate more work
1161            let Some(next_header) = self.iter.next() else {
1162                // No more headers to process
1163                return Ok(None);
1164            };
1165
1166            let mut range_headers = Vec::with_capacity(self.max_range);
1167            range_headers.push(next_header);
1168
1169            // Collect consecutive blocks up to max_range size
1170            while range_headers.len() < self.max_range {
1171                let Some(peeked) = self.iter.peek() else { break };
1172                let Some(last_header) = range_headers.last() else { break };
1173
1174                let expected_next = last_header.number() + 1;
1175                if peeked.number() != expected_next {
1176                    trace!(
1177                        target: "rpc::eth::filter",
1178                        last_block = last_header.number(),
1179                        next_block = peeked.number(),
1180                        expected = expected_next,
1181                        range_size = range_headers.len(),
1182                        "Non-consecutive block detected, stopping range collection"
1183                    );
1184                    break; // Non-consecutive block, stop here
1185                }
1186
1187                let Some(next_header) = self.iter.next() else { break };
1188                range_headers.push(next_header);
1189            }
1190
1191            // Check if we should use parallel processing for large ranges
1192            let remaining_headers = self.iter.len() + range_headers.len();
1193            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1194                self.spawn_parallel_tasks(range_headers);
1195                // Continue loop to await the spawned tasks
1196            } else {
1197                // Process small range sequentially and add results to buffer
1198                if let Some(result) = self.process_small_range(range_headers).await? {
1199                    return Ok(Some(result));
1200                }
1201                // Continue loop to check for more work
1202            }
1203        }
1204    }
1205
1206    /// Process a small range of headers sequentially
1207    ///
1208    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1209    async fn process_small_range(
1210        &mut self,
1211        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1212    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1213        // Process each header individually to avoid queuing for all receipts
1214        for header in range_headers {
1215            // First check if already cached to avoid unnecessary provider calls
1216            let (maybe_block, maybe_receipts) = self
1217                .filter_inner
1218                .eth_cache()
1219                .maybe_cached_block_and_receipts(header.hash())
1220                .await?;
1221
1222            let receipts = match maybe_receipts {
1223                Some(receipts) => receipts,
1224                None => {
1225                    // Not cached - fetch directly from provider
1226                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1227                        Some(receipts) => Arc::new(receipts),
1228                        None => continue, // No receipts found
1229                    }
1230                }
1231            };
1232
1233            if !receipts.is_empty() {
1234                self.next.push_back(ReceiptBlockResult {
1235                    receipts,
1236                    recovered_block: maybe_block,
1237                    header,
1238                });
1239            }
1240        }
1241
1242        Ok(self.next.pop_front())
1243    }
1244
1245    /// Spawn parallel tasks for processing a large range of headers
1246    ///
1247    /// This is used when the remaining headers count is at or above
1248    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1249    fn spawn_parallel_tasks(
1250        &mut self,
1251        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1252    ) {
1253        // Split headers into chunks
1254        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1255        let header_chunks = range_headers
1256            .into_iter()
1257            .chunks(chunk_size)
1258            .into_iter()
1259            .map(|chunk| chunk.collect::<Vec<_>>())
1260            .collect::<Vec<_>>();
1261
1262        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1263        for chunk_headers in header_chunks {
1264            let filter_inner = self.filter_inner.clone();
1265            let chunk_task = Box::pin(async move {
1266                let chunk_task = tokio::task::spawn_blocking(move || {
1267                    let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1268
1269                    for header in chunk_headers {
1270                        // Fetch directly from provider - RangeMode is used for older blocks
1271                        // unlikely to be cached
1272                        let receipts = match filter_inner
1273                            .provider()
1274                            .receipts_by_block(header.hash().into())?
1275                        {
1276                            Some(receipts) => Arc::new(receipts),
1277                            None => continue, // No receipts found
1278                        };
1279
1280                        if !receipts.is_empty() {
1281                            chunk_results.push(ReceiptBlockResult {
1282                                receipts,
1283                                recovered_block: None,
1284                                header,
1285                            });
1286                        }
1287                    }
1288
1289                    Ok(chunk_results)
1290                });
1291
1292                // Await the blocking task and handle the result
1293                match chunk_task.await {
1294                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1295                    Ok(Err(e)) => Err(e),
1296                    Err(join_err) => {
1297                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1298                        Err(EthFilterError::InternalError)
1299                    }
1300                }
1301            });
1302
1303            self.pending_tasks.push_back(chunk_task);
1304        }
1305    }
1306}
1307
1308#[cfg(test)]
1309mod tests {
1310    use super::*;
1311    use crate::{eth::EthApi, EthApiBuilder};
1312    use alloy_network::Ethereum;
1313    use alloy_primitives::FixedBytes;
1314    use rand::Rng;
1315    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1316    use reth_ethereum_primitives::TxType;
1317    use reth_evm_ethereum::EthEvmConfig;
1318    use reth_network_api::noop::NoopNetwork;
1319    use reth_provider::test_utils::MockEthProvider;
1320    use reth_rpc_convert::RpcConverter;
1321    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1322    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1323    use reth_tasks::TokioTaskExecutor;
1324    use reth_testing_utils::generators;
1325    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1326    use std::{collections::VecDeque, sync::Arc};
1327
1328    #[test]
1329    fn test_block_range_iter() {
1330        let mut rng = generators::rng();
1331
1332        let start = rng.random::<u32>() as u64;
1333        let end = start.saturating_add(rng.random::<u32>() as u64);
1334        let step = rng.random::<u16>() as u64;
1335        let range = start..=end;
1336        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1337        let (from, mut end) = iter.next().unwrap();
1338        assert_eq!(from, start);
1339        assert_eq!(end, (from + step).min(*range.end()));
1340
1341        for (next_from, next_end) in iter {
1342            // ensure range starts with previous end + 1
1343            assert_eq!(next_from, end + 1);
1344            end = next_end;
1345        }
1346
1347        assert_eq!(end, *range.end());
1348    }
1349
1350    // Helper function to create a test EthApi instance
1351    #[expect(clippy::type_complexity)]
1352    fn build_test_eth_api(
1353        provider: MockEthProvider,
1354    ) -> EthApi<
1355        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1356        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1357    > {
1358        EthApiBuilder::new(
1359            provider.clone(),
1360            testing_pool(),
1361            NoopNetwork::default(),
1362            EthEvmConfig::new(provider.chain_spec()),
1363        )
1364        .build()
1365    }
1366
1367    #[tokio::test]
1368    async fn test_range_block_mode_empty_range() {
1369        let provider = MockEthProvider::default();
1370        let eth_api = build_test_eth_api(provider);
1371
1372        let eth_filter = super::EthFilter::new(
1373            eth_api,
1374            EthFilterConfig::default(),
1375            Box::new(TokioTaskExecutor::default()),
1376        );
1377        let filter_inner = eth_filter.inner;
1378
1379        let headers = vec![];
1380        let max_range = 100;
1381
1382        let mut range_mode = RangeBlockMode {
1383            filter_inner,
1384            iter: headers.into_iter().peekable(),
1385            next: VecDeque::new(),
1386            max_range,
1387            pending_tasks: FuturesOrdered::new(),
1388        };
1389
1390        let result = range_mode.next().await;
1391        assert!(result.is_ok());
1392        assert!(result.unwrap().is_none());
1393    }
1394
1395    #[tokio::test]
1396    async fn test_range_block_mode_queued_results_priority() {
1397        let provider = MockEthProvider::default();
1398        let eth_api = build_test_eth_api(provider);
1399
1400        let eth_filter = super::EthFilter::new(
1401            eth_api,
1402            EthFilterConfig::default(),
1403            Box::new(TokioTaskExecutor::default()),
1404        );
1405        let filter_inner = eth_filter.inner;
1406
1407        let headers = vec![
1408            SealedHeader::new(
1409                alloy_consensus::Header { number: 100, ..Default::default() },
1410                FixedBytes::random(),
1411            ),
1412            SealedHeader::new(
1413                alloy_consensus::Header { number: 101, ..Default::default() },
1414                FixedBytes::random(),
1415            ),
1416        ];
1417
1418        // create specific mock results to test ordering
1419        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1420        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1421
1422        // create mock receipts to test receipt handling
1423        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1424            tx_type: TxType::Legacy,
1425            cumulative_gas_used: 100_000,
1426            logs: vec![],
1427            success: true,
1428        };
1429        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1430            tx_type: TxType::Eip1559,
1431            cumulative_gas_used: 200_000,
1432            logs: vec![],
1433            success: true,
1434        };
1435        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1436            tx_type: TxType::Eip2930,
1437            cumulative_gas_used: 150_000,
1438            logs: vec![],
1439            success: false, // Different success status
1440        };
1441
1442        let mock_result_1 = ReceiptBlockResult {
1443            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1444            recovered_block: None,
1445            header: SealedHeader::new(
1446                alloy_consensus::Header { number: 42, ..Default::default() },
1447                expected_block_hash_1,
1448            ),
1449        };
1450
1451        let mock_result_2 = ReceiptBlockResult {
1452            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1453            recovered_block: None,
1454            header: SealedHeader::new(
1455                alloy_consensus::Header { number: 43, ..Default::default() },
1456                expected_block_hash_2,
1457            ),
1458        };
1459
1460        let mut range_mode = RangeBlockMode {
1461            filter_inner,
1462            iter: headers.into_iter().peekable(),
1463            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1464            max_range: 100,
1465            pending_tasks: FuturesOrdered::new(),
1466        };
1467
1468        // first call should return the first queued result (FIFO order)
1469        let result1 = range_mode.next().await;
1470        assert!(result1.is_ok());
1471        let receipt_result1 = result1.unwrap().unwrap();
1472        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1473        assert_eq!(receipt_result1.header.number, 42);
1474
1475        // verify receipts
1476        assert_eq!(receipt_result1.receipts.len(), 2);
1477        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1478        assert_eq!(
1479            receipt_result1.receipts[0].cumulative_gas_used,
1480            mock_receipt_1.cumulative_gas_used
1481        );
1482        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1483        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1484        assert_eq!(
1485            receipt_result1.receipts[1].cumulative_gas_used,
1486            mock_receipt_2.cumulative_gas_used
1487        );
1488        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1489
1490        // second call should return the second queued result
1491        let result2 = range_mode.next().await;
1492        assert!(result2.is_ok());
1493        let receipt_result2 = result2.unwrap().unwrap();
1494        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1495        assert_eq!(receipt_result2.header.number, 43);
1496
1497        // verify receipts
1498        assert_eq!(receipt_result2.receipts.len(), 1);
1499        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1500        assert_eq!(
1501            receipt_result2.receipts[0].cumulative_gas_used,
1502            mock_receipt_3.cumulative_gas_used
1503        );
1504        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1505
1506        // queue should now be empty
1507        assert!(range_mode.next.is_empty());
1508
1509        let result3 = range_mode.next().await;
1510        assert!(result3.is_ok());
1511    }
1512
1513    #[tokio::test]
1514    async fn test_range_block_mode_single_block_no_receipts() {
1515        let provider = MockEthProvider::default();
1516        let eth_api = build_test_eth_api(provider);
1517
1518        let eth_filter = super::EthFilter::new(
1519            eth_api,
1520            EthFilterConfig::default(),
1521            Box::new(TokioTaskExecutor::default()),
1522        );
1523        let filter_inner = eth_filter.inner;
1524
1525        let headers = vec![SealedHeader::new(
1526            alloy_consensus::Header { number: 100, ..Default::default() },
1527            FixedBytes::random(),
1528        )];
1529
1530        let mut range_mode = RangeBlockMode {
1531            filter_inner,
1532            iter: headers.into_iter().peekable(),
1533            next: VecDeque::new(),
1534            max_range: 100,
1535            pending_tasks: FuturesOrdered::new(),
1536        };
1537
1538        let result = range_mode.next().await;
1539        assert!(result.is_ok());
1540    }
1541
1542    #[tokio::test]
1543    async fn test_range_block_mode_provider_receipts() {
1544        let provider = MockEthProvider::default();
1545
1546        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1547        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1548        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1549
1550        let block_hash_1 = FixedBytes::random();
1551        let block_hash_2 = FixedBytes::random();
1552        let block_hash_3 = FixedBytes::random();
1553
1554        provider.add_header(block_hash_1, header_1.clone());
1555        provider.add_header(block_hash_2, header_2.clone());
1556        provider.add_header(block_hash_3, header_3.clone());
1557
1558        // create mock receipts to test provider fetching with mock logs
1559        let mock_log = alloy_primitives::Log {
1560            address: alloy_primitives::Address::ZERO,
1561            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1562        };
1563
1564        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1565            tx_type: TxType::Legacy,
1566            cumulative_gas_used: 21_000,
1567            logs: vec![mock_log.clone()],
1568            success: true,
1569        };
1570        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1571            tx_type: TxType::Eip1559,
1572            cumulative_gas_used: 42_000,
1573            logs: vec![mock_log.clone()],
1574            success: true,
1575        };
1576        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1577            tx_type: TxType::Eip2930,
1578            cumulative_gas_used: 30_000,
1579            logs: vec![mock_log.clone()],
1580            success: false,
1581        };
1582
1583        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1584        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1585
1586        let eth_api = build_test_eth_api(provider);
1587
1588        let eth_filter = super::EthFilter::new(
1589            eth_api,
1590            EthFilterConfig::default(),
1591            Box::new(TokioTaskExecutor::default()),
1592        );
1593        let filter_inner = eth_filter.inner;
1594
1595        let headers = vec![
1596            SealedHeader::new(header_1, block_hash_1),
1597            SealedHeader::new(header_2, block_hash_2),
1598            SealedHeader::new(header_3, block_hash_3),
1599        ];
1600
1601        let mut range_mode = RangeBlockMode {
1602            filter_inner,
1603            iter: headers.into_iter().peekable(),
1604            next: VecDeque::new(),
1605            max_range: 3, // include the 3 blocks in the first queried results
1606            pending_tasks: FuturesOrdered::new(),
1607        };
1608
1609        // first call should fetch receipts from provider and return first block with receipts
1610        let result = range_mode.next().await;
1611        assert!(result.is_ok());
1612        let receipt_result = result.unwrap().unwrap();
1613
1614        assert_eq!(receipt_result.header.hash(), block_hash_1);
1615        assert_eq!(receipt_result.header.number, 100);
1616        assert_eq!(receipt_result.receipts.len(), 2);
1617
1618        // verify receipts
1619        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1620        assert_eq!(
1621            receipt_result.receipts[0].cumulative_gas_used,
1622            receipt_100_1.cumulative_gas_used
1623        );
1624        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1625
1626        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1627        assert_eq!(
1628            receipt_result.receipts[1].cumulative_gas_used,
1629            receipt_100_2.cumulative_gas_used
1630        );
1631        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1632
1633        // second call should return the second block with receipts
1634        let result2 = range_mode.next().await;
1635        assert!(result2.is_ok());
1636        let receipt_result2 = result2.unwrap().unwrap();
1637
1638        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1639        assert_eq!(receipt_result2.header.number, 101);
1640        assert_eq!(receipt_result2.receipts.len(), 1);
1641
1642        // verify receipts
1643        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1644        assert_eq!(
1645            receipt_result2.receipts[0].cumulative_gas_used,
1646            receipt_101_1.cumulative_gas_used
1647        );
1648        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1649
1650        // third call should return None since no more blocks with receipts
1651        let result3 = range_mode.next().await;
1652        assert!(result3.is_ok());
1653        assert!(result3.unwrap().is_none());
1654    }
1655
1656    #[tokio::test]
1657    async fn test_range_block_mode_iterator_exhaustion() {
1658        let provider = MockEthProvider::default();
1659
1660        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1661        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1662
1663        let block_hash_100 = FixedBytes::random();
1664        let block_hash_101 = FixedBytes::random();
1665
1666        // Associate headers with hashes first
1667        provider.add_header(block_hash_100, header_100.clone());
1668        provider.add_header(block_hash_101, header_101.clone());
1669
1670        // Add mock receipts so headers are actually processed
1671        let mock_receipt = reth_ethereum_primitives::Receipt {
1672            tx_type: TxType::Legacy,
1673            cumulative_gas_used: 21_000,
1674            logs: vec![],
1675            success: true,
1676        };
1677        provider.add_receipts(100, vec![mock_receipt.clone()]);
1678        provider.add_receipts(101, vec![mock_receipt.clone()]);
1679
1680        let eth_api = build_test_eth_api(provider);
1681
1682        let eth_filter = super::EthFilter::new(
1683            eth_api,
1684            EthFilterConfig::default(),
1685            Box::new(TokioTaskExecutor::default()),
1686        );
1687        let filter_inner = eth_filter.inner;
1688
1689        let headers = vec![
1690            SealedHeader::new(header_100, block_hash_100),
1691            SealedHeader::new(header_101, block_hash_101),
1692        ];
1693
1694        let mut range_mode = RangeBlockMode {
1695            filter_inner,
1696            iter: headers.into_iter().peekable(),
1697            next: VecDeque::new(),
1698            max_range: 1,
1699            pending_tasks: FuturesOrdered::new(),
1700        };
1701
1702        let result1 = range_mode.next().await;
1703        assert!(result1.is_ok());
1704        assert!(result1.unwrap().is_some()); // Should have processed block 100
1705
1706        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1707
1708        let result2 = range_mode.next().await;
1709        assert!(result2.is_ok());
1710        assert!(result2.unwrap().is_some()); // Should have processed block 101
1711
1712        // now iterator should be exhausted
1713        assert!(range_mode.iter.peek().is_none());
1714
1715        // further calls should return None
1716        let result3 = range_mode.next().await;
1717        assert!(result3.is_ok());
1718        assert!(result3.unwrap().is_none());
1719    }
1720
1721    #[tokio::test]
1722    async fn test_cached_mode_with_mock_receipts() {
1723        // create test data
1724        let test_hash = FixedBytes::from([42u8; 32]);
1725        let test_block_number = 100u64;
1726        let test_header = SealedHeader::new(
1727            alloy_consensus::Header {
1728                number: test_block_number,
1729                gas_used: 50_000,
1730                ..Default::default()
1731            },
1732            test_hash,
1733        );
1734
1735        // add a mock receipt to the provider with a mock log
1736        let mock_log = alloy_primitives::Log {
1737            address: alloy_primitives::Address::ZERO,
1738            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1739        };
1740
1741        let mock_receipt = reth_ethereum_primitives::Receipt {
1742            tx_type: TxType::Legacy,
1743            cumulative_gas_used: 21_000,
1744            logs: vec![mock_log],
1745            success: true,
1746        };
1747
1748        let provider = MockEthProvider::default();
1749        provider.add_header(test_hash, test_header.header().clone());
1750        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1751
1752        let eth_api = build_test_eth_api(provider);
1753        let eth_filter = super::EthFilter::new(
1754            eth_api,
1755            EthFilterConfig::default(),
1756            Box::new(TokioTaskExecutor::default()),
1757        );
1758        let filter_inner = eth_filter.inner;
1759
1760        let headers = vec![test_header.clone()];
1761
1762        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1763
1764        // should find the receipt from provider fallback (cache will be empty)
1765        let result = cached_mode.next().await.expect("next should succeed");
1766        let receipt_block_result = result.expect("should have receipt result");
1767        assert_eq!(receipt_block_result.header.hash(), test_hash);
1768        assert_eq!(receipt_block_result.header.number, test_block_number);
1769        assert_eq!(receipt_block_result.receipts.len(), 1);
1770        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1771        assert_eq!(
1772            receipt_block_result.receipts[0].cumulative_gas_used,
1773            mock_receipt.cumulative_gas_used
1774        );
1775        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1776
1777        // iterator should be exhausted
1778        let result2 = cached_mode.next().await;
1779        assert!(result2.is_ok());
1780        assert!(result2.unwrap().is_none());
1781    }
1782
1783    #[tokio::test]
1784    async fn test_cached_mode_empty_headers() {
1785        let provider = MockEthProvider::default();
1786        let eth_api = build_test_eth_api(provider);
1787
1788        let eth_filter = super::EthFilter::new(
1789            eth_api,
1790            EthFilterConfig::default(),
1791            Box::new(TokioTaskExecutor::default()),
1792        );
1793        let filter_inner = eth_filter.inner;
1794
1795        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1796
1797        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1798
1799        // should immediately return None for empty headers
1800        let result = cached_mode.next().await.expect("next should succeed");
1801        assert!(result.is_none());
1802    }
1803
1804    #[tokio::test]
1805    async fn test_non_consecutive_headers_after_bloom_filter() {
1806        let provider = MockEthProvider::default();
1807
1808        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1809        let mut expected_hashes = vec![];
1810        let mut prev_hash = alloy_primitives::B256::default();
1811
1812        // Create a transaction for blocks that will have receipts
1813        use alloy_consensus::TxLegacy;
1814        use reth_ethereum_primitives::{TransactionSigned, TxType};
1815
1816        let tx_inner = TxLegacy {
1817            chain_id: Some(1),
1818            nonce: 0,
1819            gas_price: 21_000,
1820            gas_limit: 21_000,
1821            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1822            value: alloy_primitives::U256::ZERO,
1823            input: alloy_primitives::Bytes::new(),
1824        };
1825        let signature = alloy_primitives::Signature::test_signature();
1826        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1827
1828        for i in 100u64..=103 {
1829            let header = alloy_consensus::Header {
1830                number: i,
1831                parent_hash: prev_hash,
1832                // Set bloom to match filter only for blocks 100 and 102
1833                logs_bloom: if i == 100 || i == 102 {
1834                    alloy_primitives::Bloom::from([1u8; 256])
1835                } else {
1836                    alloy_primitives::Bloom::default()
1837                },
1838                ..Default::default()
1839            };
1840
1841            let hash = header.hash_slow();
1842            expected_hashes.push(hash);
1843            prev_hash = hash;
1844
1845            // Add transaction to blocks that will have receipts (100 and 102)
1846            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1847
1848            let block = reth_ethereum_primitives::Block {
1849                header,
1850                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1851            };
1852            provider.add_block(hash, block);
1853        }
1854
1855        // Add receipts with logs only to blocks that match bloom
1856        let mock_log = alloy_primitives::Log {
1857            address: alloy_primitives::Address::ZERO,
1858            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1859        };
1860
1861        let receipt = reth_ethereum_primitives::Receipt {
1862            tx_type: TxType::Legacy,
1863            cumulative_gas_used: 21_000,
1864            logs: vec![mock_log],
1865            success: true,
1866        };
1867
1868        provider.add_receipts(100, vec![receipt.clone()]);
1869        provider.add_receipts(101, vec![]);
1870        provider.add_receipts(102, vec![receipt.clone()]);
1871        provider.add_receipts(103, vec![]);
1872
1873        // Add block body indices for each block so receipts can be fetched
1874        use reth_db_api::models::StoredBlockBodyIndices;
1875        provider
1876            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1877        provider
1878            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1879        provider
1880            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1881        provider
1882            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1883
1884        let eth_api = build_test_eth_api(provider);
1885        let eth_filter = EthFilter::new(
1886            eth_api,
1887            EthFilterConfig::default(),
1888            Box::new(TokioTaskExecutor::default()),
1889        );
1890
1891        // Use default filter which will match any non-empty bloom
1892        let filter = Filter::default();
1893
1894        // Get logs in the range - this will trigger the bloom filtering
1895        let logs = eth_filter
1896            .inner
1897            .clone()
1898            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1899            .await
1900            .expect("should succeed");
1901
1902        // We should get logs from blocks 100 and 102 only (bloom filtered)
1903        assert_eq!(logs.len(), 2);
1904
1905        assert_eq!(logs[0].block_number, Some(100));
1906        assert_eq!(logs[1].block_number, Some(102));
1907
1908        // Each block hash should be the hash of its own header, not derived from any other header
1909        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
1910        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
1911    }
1912}