Skip to main content

reth_rpc/eth/
filter.rs

1//! `eth_` `Filter` RPC handler implementation
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7    BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8    PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12    future::TryFutureExt,
13    stream::{FuturesOrdered, StreamExt},
14    Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21    helpers::{EthBlocks, LoadReceipt},
22    EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23    RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26    logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27    EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31    BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32    ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::Runtime;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37    collections::{HashMap, VecDeque},
38    fmt,
39    iter::{Peekable, StepBy},
40    ops::RangeInclusive,
41    pin::Pin,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    sync::{mpsc::Receiver, oneshot, Mutex},
47    time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53    Eth: FullEthApiTypes
54        + RpcNodeCoreExt<Provider: BlockIdReader>
55        + LoadReceipt
56        + EthBlocks
57        + 'static,
58{
59    /// Returns logs matching given filter object, no query limits
60    fn logs(
61        &self,
62        filter: Filter,
63        limits: QueryLimits,
64    ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65        trace!(target: "rpc::eth", "Serving eth_getLogs");
66        self.logs_for_filter(filter, limits).map_err(|e| e.into())
67    }
68}
69
70/// Threshold for deciding between cached and range mode processing
71const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73/// Threshold for bloom filter matches that triggers reduced caching
74const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76/// Threshold for bloom filter matches that triggers moderately reduced caching
77const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79/// Minimum block count to apply bloom filter match adjustments
80const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82/// The maximum number of headers we read at once when handling a range filter.
83const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
84
85/// Threshold for enabling parallel processing in range mode
86const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88/// Default concurrency for parallel processing
89const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91/// `Eth` filter RPC implementation.
92///
93/// This type handles `eth_` rpc requests related to filters (`eth_getLogs`).
94pub struct EthFilter<Eth: EthApiTypes> {
95    /// All nested fields bundled together
96    inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101    Eth: EthApiTypes,
102{
103    fn clone(&self) -> Self {
104        Self { inner: self.inner.clone() }
105    }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110    Eth: EthApiTypes + 'static,
111{
112    /// Creates a new, shareable instance.
113    ///
114    /// This uses the given pool to get notified about new transactions, the provider to interact
115    /// with the blockchain, the cache to fetch cacheable data, like the logs.
116    ///
117    /// See also [`EthFilterConfig`].
118    ///
119    /// This also spawns a task that periodically clears stale filters.
120    ///
121    /// # Create a new instance with [`EthApi`](crate::EthApi)
122    ///
123    /// ```no_run
124    /// use reth_evm_ethereum::EthEvmConfig;
125    /// use reth_network_api::noop::NoopNetwork;
126    /// use reth_provider::noop::NoopProvider;
127    /// use reth_rpc::{EthApi, EthFilter};
128    /// use reth_tasks::Runtime;
129    /// use reth_transaction_pool::noop::NoopTransactionPool;
130    /// let eth_api = EthApi::builder(
131    ///     NoopProvider::default(),
132    ///     NoopTransactionPool::default(),
133    ///     NoopNetwork::default(),
134    ///     EthEvmConfig::mainnet(),
135    /// )
136    /// .build();
137    /// let filter = EthFilter::new(eth_api, Default::default(), Runtime::test());
138    /// ```
139    pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self {
140        let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141            config;
142        let inner = EthFilterInner {
143            eth_api,
144            active_filters: ActiveFilters::new(),
145            id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146            max_headers_range: MAX_HEADERS_RANGE,
147            task_spawner,
148            stale_filter_ttl,
149            query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150        };
151
152        let eth_filter = Self { inner: Arc::new(inner) };
153
154        let this = eth_filter.clone();
155        eth_filter.inner.task_spawner.spawn_critical_task(
156            "eth-filters_stale-filters-clean",
157            async move {
158                this.watch_and_clear_stale_filters().await;
159            },
160        );
161
162        eth_filter
163    }
164
165    /// Returns all currently active filters
166    pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167        &self.inner.active_filters
168    }
169
170    /// Endless future that [`Self::clear_stale_filters`] every `stale_filter_ttl` interval.
171    /// Nonetheless, this endless future frees the thread at every await point.
172    async fn watch_and_clear_stale_filters(&self) {
173        let mut interval = tokio::time::interval_at(
174            tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175            self.inner.stale_filter_ttl,
176        );
177        interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178        loop {
179            interval.tick().await;
180            self.clear_stale_filters(Instant::now()).await;
181        }
182    }
183
184    /// Clears all filters that have not been polled for longer than the configured
185    /// `stale_filter_ttl` at the given instant.
186    pub async fn clear_stale_filters(&self, now: Instant) {
187        trace!(target: "rpc::eth", "clear stale filters");
188        let mut filters = self.active_filters().inner.lock().await;
189        filters.retain(|id, filter| {
190            let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192            if !is_valid {
193                trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194            }
195
196            is_valid
197        });
198        filters.shrink_to_fit();
199    }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204    Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205        + RpcNodeCoreExt
206        + LoadReceipt
207        + EthBlocks
208        + 'static,
209{
210    /// Access the underlying provider.
211    fn provider(&self) -> &Eth::Provider {
212        self.inner.eth_api.provider()
213    }
214
215    /// Access the underlying pool.
216    fn pool(&self) -> &Eth::Pool {
217        self.inner.eth_api.pool()
218    }
219
220    /// Returns all the filter changes for the given id, if any
221    pub async fn filter_changes(
222        &self,
223        id: FilterId,
224    ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225        let info = self.provider().chain_info()?;
226        let best_number = info.best_number;
227
228        // start_block is the block from which we should start fetching changes, the next block from
229        // the last time changes were polled, in other words the best block at last poll + 1
230        let (start_block, kind) = {
231            let mut filters = self.inner.active_filters.inner.lock().await;
232            let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234            if filter.block > best_number {
235                // no new blocks since the last poll
236                return Ok(FilterChanges::Empty)
237            }
238
239            // update filter
240            // we fetch all changes from [filter.block..best_block], so we advance the filter's
241            // block to `best_block +1`, the next from which we should start fetching changes again
242            let mut block = best_number + 1;
243            std::mem::swap(&mut filter.block, &mut block);
244            filter.last_poll_timestamp = Instant::now();
245
246            (block, filter.kind.clone())
247        };
248
249        match kind {
250            FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251            FilterKind::Block => {
252                // Note: we need to fetch the block hashes from inclusive range
253                // [start_block..best_block]
254                let end_block = best_number + 1;
255                let block_hashes =
256                    self.provider().canonical_hashes_range(start_block, end_block).map_err(
257                        |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258                    )?;
259                Ok(FilterChanges::Hashes(block_hashes))
260            }
261            FilterKind::Log(filter) => {
262                let (from_block_number, to_block_number) = match filter.block_option {
263                    FilterBlockOption::Range { from_block, to_block } => {
264                        let from = from_block
265                            .map(|num| self.provider().convert_block_number(num))
266                            .transpose()?
267                            .flatten();
268                        let to = to_block
269                            .map(|num| self.provider().convert_block_number(num))
270                            .transpose()?
271                            .flatten();
272                        logs_utils::get_filter_block_range(from, to, start_block, info)?
273                    }
274                    FilterBlockOption::AtBlockHash(block_hash) => {
275                        // blockHash is equivalent to fromBlock = toBlock = the block number with
276                        // hash blockHash
277                        // get_logs_in_block_range is inclusive
278                        let block_number = self
279                            .provider()
280                            .block_number(block_hash)?
281                            .ok_or(ProviderError::HeaderNotFound(block_hash.into()))?;
282                        (block_number, block_number)
283                    }
284                };
285                let logs = self
286                    .inner
287                    .clone()
288                    .get_logs_in_block_range(
289                        *filter,
290                        from_block_number,
291                        to_block_number,
292                        self.inner.query_limits,
293                    )
294                    .await?;
295                Ok(FilterChanges::Logs(logs))
296            }
297        }
298    }
299
300    /// Returns an array of all logs matching filter with given id.
301    ///
302    /// Returns an error if no matching log filter exists.
303    ///
304    /// Handler for `eth_getFilterLogs`
305    pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
306        let filter = {
307            let mut filters = self.inner.active_filters.inner.lock().await;
308            let filter =
309                filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
310            if let FilterKind::Log(ref inner_filter) = filter.kind {
311                filter.last_poll_timestamp = Instant::now();
312                *inner_filter.clone()
313            } else {
314                // Not a log filter
315                return Err(EthFilterError::FilterNotFound(id))
316            }
317        };
318
319        self.logs_for_filter(filter, self.inner.query_limits).await
320    }
321
322    /// Returns logs matching given filter object.
323    async fn logs_for_filter(
324        &self,
325        filter: Filter,
326        limits: QueryLimits,
327    ) -> Result<Vec<Log>, EthFilterError> {
328        self.inner.clone().logs_for_filter(filter, limits).await
329    }
330}
331
332#[async_trait]
333impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
334where
335    Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
336{
337    /// Handler for `eth_newFilter`
338    async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
339        trace!(target: "rpc::eth", "Serving eth_newFilter");
340        self.inner
341            .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
342            .await
343    }
344
345    /// Handler for `eth_newBlockFilter`
346    async fn new_block_filter(&self) -> RpcResult<FilterId> {
347        trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
348        self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
349    }
350
351    /// Handler for `eth_newPendingTransactionFilter`
352    async fn new_pending_transaction_filter(
353        &self,
354        kind: Option<PendingTransactionFilterKind>,
355    ) -> RpcResult<FilterId> {
356        trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
357
358        let transaction_kind = match kind.unwrap_or_default() {
359            PendingTransactionFilterKind::Hashes => {
360                let receiver = self.pool().pending_transactions_listener();
361                let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
362                FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
363            }
364            PendingTransactionFilterKind::Full => {
365                let stream = self.pool().new_pending_pool_transactions_listener();
366                let full_txs_receiver = FullTransactionsReceiver::new(
367                    stream,
368                    dyn_clone::clone(self.inner.eth_api.converter()),
369                );
370                FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
371                    full_txs_receiver,
372                )))
373            }
374        };
375
376        // Install the filter and propagate any errors
377        self.inner.install_filter(transaction_kind).await
378    }
379
380    /// Handler for `eth_getFilterChanges`
381    async fn filter_changes(
382        &self,
383        id: FilterId,
384    ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
385        trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
386        Ok(Self::filter_changes(self, id).await?)
387    }
388
389    /// Returns an array of all logs matching filter with given id.
390    ///
391    /// Returns an error if no matching log filter exists.
392    ///
393    /// Handler for `eth_getFilterLogs`
394    async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
395        trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
396        Ok(Self::filter_logs(self, id).await?)
397    }
398
399    /// Handler for `eth_uninstallFilter`
400    async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
401        trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
402        let mut filters = self.inner.active_filters.inner.lock().await;
403        if filters.remove(&id).is_some() {
404            trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
405            Ok(true)
406        } else {
407            Ok(false)
408        }
409    }
410
411    /// Returns logs matching given filter object.
412    ///
413    /// Handler for `eth_getLogs`
414    async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
415        trace!(target: "rpc::eth", "Serving eth_getLogs");
416        Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
417    }
418}
419
420impl<Eth> std::fmt::Debug for EthFilter<Eth>
421where
422    Eth: EthApiTypes,
423{
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        f.debug_struct("EthFilter").finish_non_exhaustive()
426    }
427}
428
429/// Container type `EthFilter`
430#[derive(Debug)]
431struct EthFilterInner<Eth: EthApiTypes> {
432    /// Inner `eth` API implementation.
433    eth_api: Eth,
434    /// All currently installed filters.
435    active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
436    /// Provides ids to identify filters
437    id_provider: Arc<dyn IdProvider>,
438    /// limits for logs queries
439    query_limits: QueryLimits,
440    /// maximum number of headers to read at once for range filter
441    max_headers_range: u64,
442    /// The type that can spawn tasks.
443    task_spawner: Runtime,
444    /// Duration since the last filter poll, after which the filter is considered stale
445    stale_filter_ttl: Duration,
446}
447
448impl<Eth> EthFilterInner<Eth>
449where
450    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
451        + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
452        + LoadReceipt
453        + EthBlocks
454        + 'static,
455{
456    /// Access the underlying provider.
457    fn provider(&self) -> &Eth::Provider {
458        self.eth_api.provider()
459    }
460
461    /// Access the underlying [`EthStateCache`].
462    fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
463        self.eth_api.cache()
464    }
465
466    /// Returns logs matching given filter object.
467    async fn logs_for_filter(
468        self: Arc<Self>,
469        filter: Filter,
470        limits: QueryLimits,
471    ) -> Result<Vec<Log>, EthFilterError> {
472        match filter.block_option {
473            FilterBlockOption::AtBlockHash(block_hash) => {
474                // First try to get cached block and receipts, as it's likely they're already cached
475                let Some((receipts, maybe_block)) =
476                    self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
477                else {
478                    return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
479                };
480
481                // Read number and timestamp from cached block or provider header
482                let (block_number, block_timestamp) = if let Some(block) = &maybe_block {
483                    (block.header().number(), block.header().timestamp())
484                } else {
485                    let header = self
486                        .provider()
487                        .header_by_hash_or_number(block_hash.into())?
488                        .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
489                    (header.number(), header.timestamp())
490                };
491
492                // Check if the block has been pruned (EIP-4444)
493                let earliest_block = self.provider().earliest_block_number()?;
494                if block_number < earliest_block {
495                    return Err(EthApiError::PrunedHistoryUnavailable.into());
496                }
497
498                let block_num_hash = BlockNumHash::new(block_number, block_hash);
499
500                let mut all_logs = Vec::new();
501                append_matching_block_logs(
502                    &mut all_logs,
503                    maybe_block
504                        .map(ProviderOrBlock::Block)
505                        .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
506                    &filter,
507                    block_num_hash,
508                    &receipts,
509                    false,
510                    block_timestamp,
511                )?;
512
513                Ok(all_logs)
514            }
515            FilterBlockOption::Range { from_block, to_block } => {
516                // Handle special case where from block is pending
517                if from_block.is_some_and(|b| b.is_pending()) {
518                    let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
519                    if !(to_block.is_pending() || to_block.is_number()) {
520                        // always empty range
521                        return Ok(Vec::new());
522                    }
523                    // Try to get pending block and receipts
524                    if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
525                        if let BlockNumberOrTag::Number(to_block) = to_block &&
526                            to_block < pending_block.block.number()
527                        {
528                            // this block range is empty based on the user input
529                            return Ok(Vec::new());
530                        }
531
532                        let info = self.provider().chain_info()?;
533                        if pending_block.block.number() > info.best_number {
534                            // only consider the pending block if it is ahead of the chain
535                            let mut all_logs = Vec::new();
536                            let timestamp = pending_block.block.timestamp();
537                            let block_num_hash = pending_block.block.num_hash();
538                            append_matching_block_logs(
539                                &mut all_logs,
540                                ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
541                                &filter,
542                                block_num_hash,
543                                &pending_block.receipts,
544                                false, // removed = false for pending blocks
545                                timestamp,
546                            )?;
547                            return Ok(all_logs);
548                        }
549                    }
550                }
551
552                let info = self.provider().chain_info()?;
553                let start_block = info.best_number;
554                let from = from_block
555                    .map(|num| self.provider().convert_block_number(num))
556                    .transpose()?
557                    .flatten();
558                let to = to_block
559                    .map(|num| self.provider().convert_block_number(num))
560                    .transpose()?
561                    .flatten();
562
563                // Return error if toBlock exceeds current head
564                if let Some(t) = to &&
565                    t > info.best_number
566                {
567                    return Err(EthFilterError::BlockRangeExceedsHead {
568                        requested: t,
569                        head: info.best_number,
570                    });
571                }
572
573                if let Some(f) = from &&
574                    f > info.best_number
575                {
576                    // start block higher than local head, can return empty
577                    return Ok(Vec::new());
578                }
579
580                let (from_block_number, to_block_number) =
581                    logs_utils::get_filter_block_range(from, to, start_block, info)?;
582
583                // Check if the requested range overlaps with pruned history (EIP-4444)
584                let earliest_block = self.provider().earliest_block_number()?;
585                if from_block_number < earliest_block {
586                    return Err(EthApiError::PrunedHistoryUnavailable.into());
587                }
588
589                self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
590                    .await
591            }
592        }
593    }
594
595    /// Installs a new filter and returns the new identifier.
596    async fn install_filter(
597        &self,
598        kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
599    ) -> RpcResult<FilterId> {
600        let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
601        let subscription_id = self.id_provider.next_id();
602
603        let id = match subscription_id {
604            jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
605            jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
606        };
607        let mut filters = self.active_filters.inner.lock().await;
608        filters.insert(
609            id.clone(),
610            ActiveFilter {
611                block: last_poll_block_number,
612                last_poll_timestamp: Instant::now(),
613                kind,
614            },
615        );
616        Ok(id)
617    }
618
619    /// Returns all logs in the given _inclusive_ range that match the filter
620    ///
621    /// Returns an error if:
622    ///  - underlying database error
623    ///  - amount of matches exceeds configured limit
624    async fn get_logs_in_block_range(
625        self: Arc<Self>,
626        filter: Filter,
627        from_block: u64,
628        to_block: u64,
629        limits: QueryLimits,
630    ) -> Result<Vec<Log>, EthFilterError> {
631        trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
632
633        // perform boundary checks first
634        if to_block < from_block {
635            return Err(EthFilterError::InvalidBlockRangeParams)
636        }
637
638        if let Some(max_blocks_per_filter) =
639            limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
640        {
641            return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
642        }
643
644        let (tx, rx) = oneshot::channel();
645        let this = self.clone();
646        self.task_spawner.spawn_blocking_task(async move {
647            let res =
648                this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
649            let _ = tx.send(res);
650        });
651
652        rx.await.map_err(|_| EthFilterError::InternalError)?
653    }
654
655    /// Returns all logs in the given _inclusive_ range that match the filter
656    ///
657    /// Note: This function uses a mix of blocking db operations for fetching indices and header
658    /// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
659    /// This function is considered blocking and should thus be spawned on a blocking task.
660    ///
661    /// Returns an error if:
662    ///  - underlying database error
663    async fn get_logs_in_block_range_inner(
664        self: Arc<Self>,
665        filter: &Filter,
666        from_block: u64,
667        to_block: u64,
668        limits: QueryLimits,
669    ) -> Result<Vec<Log>, EthFilterError> {
670        let mut all_logs = Vec::new();
671        let mut matching_headers = Vec::new();
672
673        // get current chain tip to determine processing mode
674        let chain_tip = self.provider().best_block_number()?;
675
676        // first collect all headers that match the bloom filter for cached mode decision
677        for (from, to) in
678            BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
679        {
680            let headers = self.provider().headers_range(from..=to)?;
681
682            let mut headers_iter = headers.into_iter().peekable();
683
684            while let Some(header) = headers_iter.next() {
685                if !filter.matches_bloom(header.logs_bloom()) {
686                    continue
687                }
688
689                let current_number = header.number();
690
691                let block_hash = match headers_iter.peek() {
692                    Some(next_header) if next_header.number() == current_number + 1 => {
693                        // Headers are consecutive, use the more efficient parent_hash
694                        next_header.parent_hash()
695                    }
696                    _ => {
697                        // Headers not consecutive or last header, calculate hash
698                        header.hash_slow()
699                    }
700                };
701
702                matching_headers.push(SealedHeader::new(header, block_hash));
703            }
704        }
705
706        // initialize the appropriate range mode based on collected headers
707        let mut range_mode = RangeMode::new(
708            self.clone(),
709            matching_headers,
710            from_block,
711            to_block,
712            self.max_headers_range,
713            chain_tip,
714        );
715
716        // iterate through the range mode to get receipts and blocks
717        while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
718            range_mode.next().await?
719        {
720            let num_hash = header.num_hash();
721            append_matching_block_logs(
722                &mut all_logs,
723                recovered_block
724                    .map(ProviderOrBlock::Block)
725                    .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
726                filter,
727                num_hash,
728                &receipts,
729                false,
730                header.timestamp(),
731            )?;
732
733            // size check but only if range is multiple blocks, so we always return all
734            // logs of a single block
735            let is_multi_block_range = from_block != to_block;
736            if let Some(max_logs_per_response) = limits.max_logs_per_response &&
737                is_multi_block_range &&
738                all_logs.len() > max_logs_per_response
739            {
740                let retry_to_block =
741                    if num_hash.number == from_block { from_block } else { num_hash.number - 1 };
742
743                debug!(
744                    target: "rpc::eth::filter",
745                    logs_found = all_logs.len(),
746                    max_logs_per_response,
747                    from_block,
748                    to_block = retry_to_block,
749                    "Query exceeded max logs per response limit"
750                );
751                return Err(EthFilterError::QueryExceedsMaxResults {
752                    max_logs: max_logs_per_response,
753                    from_block,
754                    to_block: retry_to_block,
755                });
756            }
757        }
758
759        Ok(all_logs)
760    }
761}
762
763/// All active filters
764#[derive(Debug, Clone, Default)]
765pub struct ActiveFilters<T> {
766    inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
767}
768
769impl<T> ActiveFilters<T> {
770    /// Returns an empty instance.
771    pub fn new() -> Self {
772        Self { inner: Arc::new(Mutex::new(HashMap::default())) }
773    }
774
775    /// Returns `true` if a filter with the given id exists.
776    pub async fn contains(&self, id: &FilterId) -> bool {
777        self.inner.lock().await.contains_key(id)
778    }
779
780    /// Returns the number of currently active filters.
781    pub async fn len(&self) -> usize {
782        self.inner.lock().await.len()
783    }
784
785    /// Returns `true` if there are no active filters.
786    pub async fn is_empty(&self) -> bool {
787        self.inner.lock().await.is_empty()
788    }
789
790    /// Returns all active filter ids.
791    pub async fn ids(&self) -> Vec<FilterId> {
792        self.inner.lock().await.keys().cloned().collect()
793    }
794}
795
796/// An installed filter
797#[derive(Debug)]
798struct ActiveFilter<T> {
799    /// At which block the filter was polled last.
800    block: u64,
801    /// Last time this filter was polled.
802    last_poll_timestamp: Instant,
803    /// What kind of filter it is.
804    kind: FilterKind<T>,
805}
806
807/// A receiver for pending transactions that returns all new transactions since the last poll.
808#[derive(Debug, Clone)]
809struct PendingTransactionsReceiver {
810    txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
811}
812
813impl PendingTransactionsReceiver {
814    fn new(receiver: Receiver<TxHash>) -> Self {
815        Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
816    }
817
818    /// Returns all new pending transactions received since the last poll.
819    async fn drain<T>(&self) -> FilterChanges<T> {
820        let mut pending_txs = Vec::new();
821        let mut prepared_stream = self.txs_receiver.lock().await;
822
823        while let Ok(tx_hash) = prepared_stream.try_recv() {
824            pending_txs.push(tx_hash);
825        }
826
827        // Convert the vector of hashes into FilterChanges::Hashes
828        FilterChanges::Hashes(pending_txs)
829    }
830}
831
832/// A structure to manage and provide access to a stream of full transaction details.
833#[derive(Debug, Clone)]
834struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
835    txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
836    converter: TxCompat,
837}
838
839impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
840where
841    T: PoolTransaction + 'static,
842    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
843{
844    /// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
845    fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
846        Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
847    }
848
849    /// Returns all new pending transactions received since the last poll.
850    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
851        let mut pending_txs = Vec::new();
852        let mut prepared_stream = self.txs_stream.lock().await;
853
854        while let Ok(tx) = prepared_stream.try_recv() {
855            match self.converter.fill_pending(tx.transaction.to_consensus()) {
856                Ok(tx) => pending_txs.push(tx),
857                Err(err) => {
858                    error!(target: "rpc",
859                        %err,
860                        "Failed to fill txn with block context"
861                    );
862                }
863            }
864        }
865        FilterChanges::Transactions(pending_txs)
866    }
867}
868
869/// Helper trait for [`FullTransactionsReceiver`] to erase the `Transaction` type.
870#[async_trait]
871trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
872    async fn drain(&self) -> FilterChanges<T>;
873}
874
875#[async_trait]
876impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
877    for FullTransactionsReceiver<T, TxCompat>
878where
879    T: PoolTransaction + 'static,
880    TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
881{
882    async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
883        Self::drain(self).await
884    }
885}
886
887/// Represents the kind of pending transaction data that can be retrieved.
888///
889/// This enum differentiates between two kinds of pending transaction data:
890/// - Just the transaction hashes.
891/// - Full transaction details.
892#[derive(Debug, Clone)]
893enum PendingTransactionKind<T> {
894    Hashes(PendingTransactionsReceiver),
895    FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
896}
897
898impl<T: 'static> PendingTransactionKind<T> {
899    async fn drain(&self) -> FilterChanges<T> {
900        match self {
901            Self::Hashes(receiver) => receiver.drain().await,
902            Self::FullTransaction(receiver) => receiver.drain().await,
903        }
904    }
905}
906
907#[derive(Clone, Debug)]
908enum FilterKind<T> {
909    Log(Box<Filter>),
910    Block,
911    PendingTransaction(PendingTransactionKind<T>),
912}
913
914/// An iterator that yields _inclusive_ block ranges of a given step size
915#[derive(Debug)]
916struct BlockRangeInclusiveIter {
917    iter: StepBy<RangeInclusive<u64>>,
918    step: u64,
919    end: u64,
920}
921
922impl BlockRangeInclusiveIter {
923    fn new(range: RangeInclusive<u64>, step: u64) -> Self {
924        Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
925    }
926}
927
928impl Iterator for BlockRangeInclusiveIter {
929    type Item = (u64, u64);
930
931    fn next(&mut self) -> Option<Self::Item> {
932        let start = self.iter.next()?;
933        let end = (start + self.step).min(self.end);
934        if start > end {
935            return None
936        }
937        Some((start, end))
938    }
939}
940
941/// Errors that can occur in the handler implementation
942#[derive(Debug, thiserror::Error)]
943pub enum EthFilterError {
944    /// Filter not found.
945    #[error("filter not found")]
946    FilterNotFound(FilterId),
947    /// Invalid block range.
948    #[error("invalid block range params")]
949    InvalidBlockRangeParams,
950    /// Block range extends beyond current head.
951    #[error("block range extends beyond current head block: requested {requested}, head {head}")]
952    BlockRangeExceedsHead {
953        /// The requested `toBlock` number
954        requested: u64,
955        /// The current head block number
956        head: u64,
957    },
958    /// Query scope is too broad.
959    #[error("query exceeds max block range {0}")]
960    QueryExceedsMaxBlocks(u64),
961    /// Query result is too large.
962    #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
963    QueryExceedsMaxResults {
964        /// Maximum number of logs allowed per response
965        max_logs: usize,
966        /// Start block of the suggested retry range
967        from_block: u64,
968        /// End block of the suggested retry range (last successfully processed block)
969        to_block: u64,
970    },
971    /// Error serving request in `eth_` namespace.
972    #[error(transparent)]
973    EthAPIError(#[from] EthApiError),
974    /// Error thrown when a spawned task failed to deliver a response.
975    #[error("internal filter error")]
976    InternalError,
977}
978
979impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
980    fn from(err: EthFilterError) -> Self {
981        match err {
982            EthFilterError::FilterNotFound(_) => rpc_error_with_code(
983                jsonrpsee::types::error::INVALID_PARAMS_CODE,
984                "filter not found",
985            ),
986            err @ EthFilterError::InternalError => {
987                rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
988            }
989            EthFilterError::EthAPIError(err) => err.into(),
990            err @ (EthFilterError::InvalidBlockRangeParams |
991            EthFilterError::QueryExceedsMaxBlocks(_) |
992            EthFilterError::QueryExceedsMaxResults { .. } |
993            EthFilterError::BlockRangeExceedsHead { .. }) => {
994                rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
995            }
996        }
997    }
998}
999
1000impl From<ProviderError> for EthFilterError {
1001    fn from(err: ProviderError) -> Self {
1002        Self::EthAPIError(err.into())
1003    }
1004}
1005
1006impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
1007    fn from(err: logs_utils::FilterBlockRangeError) -> Self {
1008        match err {
1009            logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
1010            logs_utils::FilterBlockRangeError::BlockRangeExceedsHead { requested, head } => {
1011                Self::BlockRangeExceedsHead { requested, head }
1012            }
1013        }
1014    }
1015}
1016
1017/// Helper type for the common pattern of returning receipts, block and the original header that is
1018/// a match for the filter.
1019struct ReceiptBlockResult<P>
1020where
1021    P: ReceiptProvider + BlockReader,
1022{
1023    /// We always need the entire receipts for the matching block.
1024    receipts: Arc<Vec<ProviderReceipt<P>>>,
1025    /// Block can be optional and we can fetch it lazily when needed.
1026    recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
1027    /// The header of the block.
1028    header: SealedHeader<<P as HeaderProvider>::Header>,
1029}
1030
1031/// Represents different modes for processing block ranges when filtering logs
1032enum RangeMode<
1033    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1034        + EthApiTypes
1035        + LoadReceipt
1036        + EthBlocks
1037        + 'static,
1038> {
1039    /// Use cache-based processing for recent blocks
1040    Cached(CachedMode<Eth>),
1041    /// Use range-based processing for older blocks
1042    Range(RangeBlockMode<Eth>),
1043}
1044
1045impl<
1046        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1047            + EthApiTypes
1048            + LoadReceipt
1049            + EthBlocks
1050            + 'static,
1051    > RangeMode<Eth>
1052{
1053    /// Creates a new `RangeMode`.
1054    fn new(
1055        filter_inner: Arc<EthFilterInner<Eth>>,
1056        sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1057        from_block: u64,
1058        to_block: u64,
1059        max_headers_range: u64,
1060        chain_tip: u64,
1061    ) -> Self {
1062        let block_count = to_block - from_block + 1;
1063        let distance_from_tip = chain_tip.saturating_sub(to_block);
1064
1065        // Determine if we should use cached mode based on range characteristics
1066        let use_cached_mode =
1067            Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1068
1069        if use_cached_mode && !sealed_headers.is_empty() {
1070            Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1071        } else {
1072            Self::Range(RangeBlockMode {
1073                filter_inner,
1074                iter: sealed_headers.into_iter().peekable(),
1075                next: VecDeque::new(),
1076                max_range: max_headers_range as usize,
1077                pending_tasks: FuturesOrdered::new(),
1078            })
1079        }
1080    }
1081
1082    /// Determines whether to use cached mode based on bloom filter matches and range size
1083    const fn should_use_cached_mode(
1084        headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1085        block_count: u64,
1086        distance_from_tip: u64,
1087    ) -> bool {
1088        // Headers are already filtered by bloom, so count equals length
1089        let bloom_matches = headers.len();
1090
1091        // Calculate adjusted threshold based on bloom matches
1092        let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1093
1094        block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1095    }
1096
1097    /// Calculates the adjusted cache threshold based on bloom filter matches
1098    const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1099        // Only apply adjustments for larger ranges
1100        if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1101            return CACHED_MODE_BLOCK_THRESHOLD;
1102        }
1103
1104        match bloom_matches {
1105            n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1106            n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1107            _ => CACHED_MODE_BLOCK_THRESHOLD,
1108        }
1109    }
1110
1111    /// Gets the next (receipts, `maybe_block`, header, `block_hash`) tuple.
1112    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1113        match self {
1114            Self::Cached(cached) => cached.next().await,
1115            Self::Range(range) => range.next().await,
1116        }
1117    }
1118}
1119
1120/// Mode for processing blocks using cache optimization for recent blocks
1121struct CachedMode<
1122    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1123        + EthApiTypes
1124        + LoadReceipt
1125        + EthBlocks
1126        + 'static,
1127> {
1128    filter_inner: Arc<EthFilterInner<Eth>>,
1129    headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1130}
1131
1132impl<
1133        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1134            + EthApiTypes
1135            + LoadReceipt
1136            + EthBlocks
1137            + 'static,
1138    > CachedMode<Eth>
1139{
1140    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1141        for header in self.headers_iter.by_ref() {
1142            // Use get_receipts_and_maybe_block which has automatic fallback to provider
1143            if let Some((receipts, maybe_block)) =
1144                self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1145            {
1146                return Ok(Some(ReceiptBlockResult {
1147                    receipts,
1148                    recovered_block: maybe_block,
1149                    header,
1150                }));
1151            }
1152        }
1153
1154        Ok(None) // No more headers
1155    }
1156}
1157
1158/// Type alias for parallel receipt fetching task futures used in `RangeBlockMode`
1159type ReceiptFetchFuture<P> =
1160    Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1161
1162/// Mode for processing blocks using range queries for older blocks
1163struct RangeBlockMode<
1164    Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1165        + EthApiTypes
1166        + LoadReceipt
1167        + EthBlocks
1168        + 'static,
1169> {
1170    filter_inner: Arc<EthFilterInner<Eth>>,
1171    iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1172    next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1173    max_range: usize,
1174    // Stream of ongoing receipt fetching tasks
1175    pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1176}
1177
1178impl<
1179        Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1180            + EthApiTypes
1181            + LoadReceipt
1182            + EthBlocks
1183            + 'static,
1184    > RangeBlockMode<Eth>
1185{
1186    async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1187        loop {
1188            // First, try to return any already processed result from buffer
1189            if let Some(result) = self.next.pop_front() {
1190                return Ok(Some(result));
1191            }
1192
1193            // Try to get a completed task result if there are pending tasks
1194            if let Some(task_result) = self.pending_tasks.next().await {
1195                self.next.extend(task_result?);
1196                continue;
1197            }
1198
1199            // No pending tasks - try to generate more work
1200            let Some(next_header) = self.iter.next() else {
1201                // No more headers to process
1202                return Ok(None);
1203            };
1204
1205            let mut range_headers = Vec::with_capacity(self.max_range);
1206            range_headers.push(next_header);
1207
1208            // Collect consecutive blocks up to max_range size
1209            while range_headers.len() < self.max_range {
1210                let Some(peeked) = self.iter.peek() else { break };
1211                let Some(last_header) = range_headers.last() else { break };
1212
1213                let expected_next = last_header.number() + 1;
1214                if peeked.number() != expected_next {
1215                    trace!(
1216                        target: "rpc::eth::filter",
1217                        last_block = last_header.number(),
1218                        next_block = peeked.number(),
1219                        expected = expected_next,
1220                        range_size = range_headers.len(),
1221                        "Non-consecutive block detected, stopping range collection"
1222                    );
1223                    break; // Non-consecutive block, stop here
1224                }
1225
1226                let Some(next_header) = self.iter.next() else { break };
1227                range_headers.push(next_header);
1228            }
1229
1230            // Check if we should use parallel processing for large ranges
1231            let remaining_headers = self.iter.len() + range_headers.len();
1232            if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1233                self.spawn_parallel_tasks(range_headers);
1234                // Continue loop to await the spawned tasks
1235            } else {
1236                // Process small range sequentially and add results to buffer
1237                if let Some(result) = self.process_small_range(range_headers).await? {
1238                    return Ok(Some(result));
1239                }
1240                // Continue loop to check for more work
1241            }
1242        }
1243    }
1244
1245    /// Process a small range of headers sequentially
1246    ///
1247    /// This is used when the remaining headers count is below [`PARALLEL_PROCESSING_THRESHOLD`].
1248    async fn process_small_range(
1249        &mut self,
1250        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1251    ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1252        // Process each header individually to avoid queuing for all receipts
1253        for header in range_headers {
1254            // First check if already cached to avoid unnecessary provider calls
1255            let (maybe_block, maybe_receipts) = self
1256                .filter_inner
1257                .eth_cache()
1258                .maybe_cached_block_and_receipts(header.hash())
1259                .await?;
1260
1261            let receipts = match maybe_receipts {
1262                Some(receipts) => receipts,
1263                None => {
1264                    // Not cached - fetch directly from provider
1265                    match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1266                        Some(receipts) => Arc::new(receipts),
1267                        None => continue, // No receipts found
1268                    }
1269                }
1270            };
1271
1272            if !receipts.is_empty() {
1273                self.next.push_back(ReceiptBlockResult {
1274                    receipts,
1275                    recovered_block: maybe_block,
1276                    header,
1277                });
1278            }
1279        }
1280
1281        Ok(self.next.pop_front())
1282    }
1283
1284    /// Spawn parallel tasks for processing a large range of headers
1285    ///
1286    /// This is used when the remaining headers count is at or above
1287    /// [`PARALLEL_PROCESSING_THRESHOLD`].
1288    fn spawn_parallel_tasks(
1289        &mut self,
1290        range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1291    ) {
1292        // Split headers into chunks
1293        let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1294        let header_chunks = range_headers
1295            .into_iter()
1296            .chunks(chunk_size)
1297            .into_iter()
1298            .map(|chunk| chunk.collect::<Vec<_>>())
1299            .collect::<Vec<_>>();
1300
1301        // Spawn each chunk as a separate task directly into the FuturesOrdered stream
1302        for chunk_headers in header_chunks {
1303            let filter_inner = self.filter_inner.clone();
1304            let chunk_task = Box::pin(async move {
1305                let chunk_task = tokio::task::spawn_blocking(move || {
1306                    let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1307
1308                    for header in chunk_headers {
1309                        // Fetch directly from provider - RangeMode is used for older blocks
1310                        // unlikely to be cached
1311                        let receipts = match filter_inner
1312                            .provider()
1313                            .receipts_by_block(header.hash().into())?
1314                        {
1315                            Some(receipts) => Arc::new(receipts),
1316                            None => continue, // No receipts found
1317                        };
1318
1319                        if !receipts.is_empty() {
1320                            chunk_results.push(ReceiptBlockResult {
1321                                receipts,
1322                                recovered_block: None,
1323                                header,
1324                            });
1325                        }
1326                    }
1327
1328                    Ok(chunk_results)
1329                });
1330
1331                // Await the blocking task and handle the result
1332                match chunk_task.await {
1333                    Ok(Ok(chunk_results)) => Ok(chunk_results),
1334                    Ok(Err(e)) => Err(e),
1335                    Err(join_err) => {
1336                        trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1337                        Err(EthFilterError::InternalError)
1338                    }
1339                }
1340            });
1341
1342            self.pending_tasks.push_back(chunk_task);
1343        }
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use super::*;
1350    use crate::{eth::EthApi, EthApiBuilder};
1351    use alloy_network::Ethereum;
1352    use alloy_primitives::FixedBytes;
1353    use rand::Rng;
1354    use reth_chainspec::{ChainSpec, ChainSpecProvider};
1355    use reth_ethereum_primitives::TxType;
1356    use reth_evm_ethereum::EthEvmConfig;
1357    use reth_network_api::noop::NoopNetwork;
1358    use reth_provider::test_utils::MockEthProvider;
1359    use reth_rpc_convert::RpcConverter;
1360    use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1361    use reth_rpc_eth_types::receipt::EthReceiptConverter;
1362    use reth_tasks::Runtime;
1363    use reth_testing_utils::generators;
1364    use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1365    use std::{collections::VecDeque, sync::Arc};
1366
1367    #[test]
1368    fn test_block_range_iter() {
1369        let mut rng = generators::rng();
1370
1371        let start = rng.random::<u32>() as u64;
1372        let end = start.saturating_add(rng.random::<u32>() as u64);
1373        let step = rng.random::<u16>() as u64;
1374        let range = start..=end;
1375        let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1376        let (from, mut end) = iter.next().unwrap();
1377        assert_eq!(from, start);
1378        assert_eq!(end, (from + step).min(*range.end()));
1379
1380        for (next_from, next_end) in iter {
1381            // ensure range starts with previous end + 1
1382            assert_eq!(next_from, end + 1);
1383            end = next_end;
1384        }
1385
1386        assert_eq!(end, *range.end());
1387    }
1388
1389    // Helper function to create a test EthApi instance
1390    #[expect(clippy::type_complexity)]
1391    fn build_test_eth_api(
1392        provider: MockEthProvider,
1393    ) -> EthApi<
1394        RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1395        RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1396    > {
1397        EthApiBuilder::new(
1398            provider.clone(),
1399            testing_pool(),
1400            NoopNetwork::default(),
1401            EthEvmConfig::new(provider.chain_spec()),
1402        )
1403        .build()
1404    }
1405
1406    #[tokio::test]
1407    async fn test_range_block_mode_empty_range() {
1408        let provider = MockEthProvider::default();
1409        let eth_api = build_test_eth_api(provider);
1410
1411        let eth_filter =
1412            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1413        let filter_inner = eth_filter.inner;
1414
1415        let headers = vec![];
1416        let max_range = 100;
1417
1418        let mut range_mode = RangeBlockMode {
1419            filter_inner,
1420            iter: headers.into_iter().peekable(),
1421            next: VecDeque::new(),
1422            max_range,
1423            pending_tasks: FuturesOrdered::new(),
1424        };
1425
1426        let result = range_mode.next().await;
1427        assert!(result.is_ok());
1428        assert!(result.unwrap().is_none());
1429    }
1430
1431    #[tokio::test]
1432    async fn test_range_block_mode_queued_results_priority() {
1433        let provider = MockEthProvider::default();
1434        let eth_api = build_test_eth_api(provider);
1435
1436        let eth_filter =
1437            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1438        let filter_inner = eth_filter.inner;
1439
1440        let headers = vec![
1441            SealedHeader::new(
1442                alloy_consensus::Header { number: 100, ..Default::default() },
1443                FixedBytes::random(),
1444            ),
1445            SealedHeader::new(
1446                alloy_consensus::Header { number: 101, ..Default::default() },
1447                FixedBytes::random(),
1448            ),
1449        ];
1450
1451        // create specific mock results to test ordering
1452        let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1453        let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1454
1455        // create mock receipts to test receipt handling
1456        let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1457            tx_type: TxType::Legacy,
1458            cumulative_gas_used: 100_000,
1459            logs: vec![],
1460            success: true,
1461        };
1462        let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1463            tx_type: TxType::Eip1559,
1464            cumulative_gas_used: 200_000,
1465            logs: vec![],
1466            success: true,
1467        };
1468        let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1469            tx_type: TxType::Eip2930,
1470            cumulative_gas_used: 150_000,
1471            logs: vec![],
1472            success: false, // Different success status
1473        };
1474
1475        let mock_result_1 = ReceiptBlockResult {
1476            receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1477            recovered_block: None,
1478            header: SealedHeader::new(
1479                alloy_consensus::Header { number: 42, ..Default::default() },
1480                expected_block_hash_1,
1481            ),
1482        };
1483
1484        let mock_result_2 = ReceiptBlockResult {
1485            receipts: Arc::new(vec![mock_receipt_3.clone()]),
1486            recovered_block: None,
1487            header: SealedHeader::new(
1488                alloy_consensus::Header { number: 43, ..Default::default() },
1489                expected_block_hash_2,
1490            ),
1491        };
1492
1493        let mut range_mode = RangeBlockMode {
1494            filter_inner,
1495            iter: headers.into_iter().peekable(),
1496            next: VecDeque::from([mock_result_1, mock_result_2]), // Queue two results
1497            max_range: 100,
1498            pending_tasks: FuturesOrdered::new(),
1499        };
1500
1501        // first call should return the first queued result (FIFO order)
1502        let result1 = range_mode.next().await;
1503        assert!(result1.is_ok());
1504        let receipt_result1 = result1.unwrap().unwrap();
1505        assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1506        assert_eq!(receipt_result1.header.number, 42);
1507
1508        // verify receipts
1509        assert_eq!(receipt_result1.receipts.len(), 2);
1510        assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1511        assert_eq!(
1512            receipt_result1.receipts[0].cumulative_gas_used,
1513            mock_receipt_1.cumulative_gas_used
1514        );
1515        assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1516        assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1517        assert_eq!(
1518            receipt_result1.receipts[1].cumulative_gas_used,
1519            mock_receipt_2.cumulative_gas_used
1520        );
1521        assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1522
1523        // second call should return the second queued result
1524        let result2 = range_mode.next().await;
1525        assert!(result2.is_ok());
1526        let receipt_result2 = result2.unwrap().unwrap();
1527        assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1528        assert_eq!(receipt_result2.header.number, 43);
1529
1530        // verify receipts
1531        assert_eq!(receipt_result2.receipts.len(), 1);
1532        assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1533        assert_eq!(
1534            receipt_result2.receipts[0].cumulative_gas_used,
1535            mock_receipt_3.cumulative_gas_used
1536        );
1537        assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1538
1539        // queue should now be empty
1540        assert!(range_mode.next.is_empty());
1541
1542        let result3 = range_mode.next().await;
1543        assert!(result3.is_ok());
1544    }
1545
1546    #[tokio::test]
1547    async fn test_range_block_mode_single_block_no_receipts() {
1548        let provider = MockEthProvider::default();
1549        let eth_api = build_test_eth_api(provider);
1550
1551        let eth_filter =
1552            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1553        let filter_inner = eth_filter.inner;
1554
1555        let headers = vec![SealedHeader::new(
1556            alloy_consensus::Header { number: 100, ..Default::default() },
1557            FixedBytes::random(),
1558        )];
1559
1560        let mut range_mode = RangeBlockMode {
1561            filter_inner,
1562            iter: headers.into_iter().peekable(),
1563            next: VecDeque::new(),
1564            max_range: 100,
1565            pending_tasks: FuturesOrdered::new(),
1566        };
1567
1568        let result = range_mode.next().await;
1569        assert!(result.is_ok());
1570    }
1571
1572    #[tokio::test]
1573    async fn test_range_block_mode_provider_receipts() {
1574        let provider = MockEthProvider::default();
1575
1576        let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1577        let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1578        let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1579
1580        let block_hash_1 = FixedBytes::random();
1581        let block_hash_2 = FixedBytes::random();
1582        let block_hash_3 = FixedBytes::random();
1583
1584        provider.add_header(block_hash_1, header_1.clone());
1585        provider.add_header(block_hash_2, header_2.clone());
1586        provider.add_header(block_hash_3, header_3.clone());
1587
1588        // create mock receipts to test provider fetching with mock logs
1589        let mock_log = alloy_primitives::Log {
1590            address: alloy_primitives::Address::ZERO,
1591            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1592        };
1593
1594        let receipt_100_1 = reth_ethereum_primitives::Receipt {
1595            tx_type: TxType::Legacy,
1596            cumulative_gas_used: 21_000,
1597            logs: vec![mock_log.clone()],
1598            success: true,
1599        };
1600        let receipt_100_2 = reth_ethereum_primitives::Receipt {
1601            tx_type: TxType::Eip1559,
1602            cumulative_gas_used: 42_000,
1603            logs: vec![mock_log.clone()],
1604            success: true,
1605        };
1606        let receipt_101_1 = reth_ethereum_primitives::Receipt {
1607            tx_type: TxType::Eip2930,
1608            cumulative_gas_used: 30_000,
1609            logs: vec![mock_log.clone()],
1610            success: false,
1611        };
1612
1613        provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1614        provider.add_receipts(101, vec![receipt_101_1.clone()]);
1615
1616        let eth_api = build_test_eth_api(provider);
1617
1618        let eth_filter =
1619            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1620        let filter_inner = eth_filter.inner;
1621
1622        let headers = vec![
1623            SealedHeader::new(header_1, block_hash_1),
1624            SealedHeader::new(header_2, block_hash_2),
1625            SealedHeader::new(header_3, block_hash_3),
1626        ];
1627
1628        let mut range_mode = RangeBlockMode {
1629            filter_inner,
1630            iter: headers.into_iter().peekable(),
1631            next: VecDeque::new(),
1632            max_range: 3, // include the 3 blocks in the first queried results
1633            pending_tasks: FuturesOrdered::new(),
1634        };
1635
1636        // first call should fetch receipts from provider and return first block with receipts
1637        let result = range_mode.next().await;
1638        assert!(result.is_ok());
1639        let receipt_result = result.unwrap().unwrap();
1640
1641        assert_eq!(receipt_result.header.hash(), block_hash_1);
1642        assert_eq!(receipt_result.header.number, 100);
1643        assert_eq!(receipt_result.receipts.len(), 2);
1644
1645        // verify receipts
1646        assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1647        assert_eq!(
1648            receipt_result.receipts[0].cumulative_gas_used,
1649            receipt_100_1.cumulative_gas_used
1650        );
1651        assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1652
1653        assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1654        assert_eq!(
1655            receipt_result.receipts[1].cumulative_gas_used,
1656            receipt_100_2.cumulative_gas_used
1657        );
1658        assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1659
1660        // second call should return the second block with receipts
1661        let result2 = range_mode.next().await;
1662        assert!(result2.is_ok());
1663        let receipt_result2 = result2.unwrap().unwrap();
1664
1665        assert_eq!(receipt_result2.header.hash(), block_hash_2);
1666        assert_eq!(receipt_result2.header.number, 101);
1667        assert_eq!(receipt_result2.receipts.len(), 1);
1668
1669        // verify receipts
1670        assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1671        assert_eq!(
1672            receipt_result2.receipts[0].cumulative_gas_used,
1673            receipt_101_1.cumulative_gas_used
1674        );
1675        assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1676
1677        // third call should return None since no more blocks with receipts
1678        let result3 = range_mode.next().await;
1679        assert!(result3.is_ok());
1680        assert!(result3.unwrap().is_none());
1681    }
1682
1683    #[tokio::test]
1684    async fn test_range_block_mode_iterator_exhaustion() {
1685        let provider = MockEthProvider::default();
1686
1687        let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1688        let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1689
1690        let block_hash_100 = FixedBytes::random();
1691        let block_hash_101 = FixedBytes::random();
1692
1693        // Associate headers with hashes first
1694        provider.add_header(block_hash_100, header_100.clone());
1695        provider.add_header(block_hash_101, header_101.clone());
1696
1697        // Add mock receipts so headers are actually processed
1698        let mock_receipt = reth_ethereum_primitives::Receipt {
1699            tx_type: TxType::Legacy,
1700            cumulative_gas_used: 21_000,
1701            logs: vec![],
1702            success: true,
1703        };
1704        provider.add_receipts(100, vec![mock_receipt.clone()]);
1705        provider.add_receipts(101, vec![mock_receipt.clone()]);
1706
1707        let eth_api = build_test_eth_api(provider);
1708
1709        let eth_filter =
1710            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1711        let filter_inner = eth_filter.inner;
1712
1713        let headers = vec![
1714            SealedHeader::new(header_100, block_hash_100),
1715            SealedHeader::new(header_101, block_hash_101),
1716        ];
1717
1718        let mut range_mode = RangeBlockMode {
1719            filter_inner,
1720            iter: headers.into_iter().peekable(),
1721            next: VecDeque::new(),
1722            max_range: 1,
1723            pending_tasks: FuturesOrdered::new(),
1724        };
1725
1726        let result1 = range_mode.next().await;
1727        assert!(result1.is_ok());
1728        assert!(result1.unwrap().is_some()); // Should have processed block 100
1729
1730        assert!(range_mode.iter.peek().is_some()); // Should still have block 101
1731
1732        let result2 = range_mode.next().await;
1733        assert!(result2.is_ok());
1734        assert!(result2.unwrap().is_some()); // Should have processed block 101
1735
1736        // now iterator should be exhausted
1737        assert!(range_mode.iter.peek().is_none());
1738
1739        // further calls should return None
1740        let result3 = range_mode.next().await;
1741        assert!(result3.is_ok());
1742        assert!(result3.unwrap().is_none());
1743    }
1744
1745    #[tokio::test]
1746    async fn test_cached_mode_with_mock_receipts() {
1747        // create test data
1748        let test_hash = FixedBytes::from([42u8; 32]);
1749        let test_block_number = 100u64;
1750        let test_header = SealedHeader::new(
1751            alloy_consensus::Header {
1752                number: test_block_number,
1753                gas_used: 50_000,
1754                ..Default::default()
1755            },
1756            test_hash,
1757        );
1758
1759        // add a mock receipt to the provider with a mock log
1760        let mock_log = alloy_primitives::Log {
1761            address: alloy_primitives::Address::ZERO,
1762            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1763        };
1764
1765        let mock_receipt = reth_ethereum_primitives::Receipt {
1766            tx_type: TxType::Legacy,
1767            cumulative_gas_used: 21_000,
1768            logs: vec![mock_log],
1769            success: true,
1770        };
1771
1772        let provider = MockEthProvider::default();
1773        provider.add_header(test_hash, test_header.header().clone());
1774        provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1775
1776        let eth_api = build_test_eth_api(provider);
1777        let eth_filter =
1778            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1779        let filter_inner = eth_filter.inner;
1780
1781        let headers = vec![test_header.clone()];
1782
1783        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1784
1785        // should find the receipt from provider fallback (cache will be empty)
1786        let result = cached_mode.next().await.expect("next should succeed");
1787        let receipt_block_result = result.expect("should have receipt result");
1788        assert_eq!(receipt_block_result.header.hash(), test_hash);
1789        assert_eq!(receipt_block_result.header.number, test_block_number);
1790        assert_eq!(receipt_block_result.receipts.len(), 1);
1791        assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1792        assert_eq!(
1793            receipt_block_result.receipts[0].cumulative_gas_used,
1794            mock_receipt.cumulative_gas_used
1795        );
1796        assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1797
1798        // iterator should be exhausted
1799        let result2 = cached_mode.next().await;
1800        assert!(result2.is_ok());
1801        assert!(result2.unwrap().is_none());
1802    }
1803
1804    #[tokio::test]
1805    async fn test_cached_mode_empty_headers() {
1806        let provider = MockEthProvider::default();
1807        let eth_api = build_test_eth_api(provider);
1808
1809        let eth_filter =
1810            super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1811        let filter_inner = eth_filter.inner;
1812
1813        let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1814
1815        let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1816
1817        // should immediately return None for empty headers
1818        let result = cached_mode.next().await.expect("next should succeed");
1819        assert!(result.is_none());
1820    }
1821
1822    #[tokio::test]
1823    async fn test_log_limit_retry_range_excludes_overflow_block() {
1824        let provider = MockEthProvider::default();
1825
1826        use alloy_consensus::TxLegacy;
1827        use reth_db_api::models::StoredBlockBodyIndices;
1828        use reth_ethereum_primitives::{TransactionSigned, TxType};
1829
1830        let tx_inner = TxLegacy {
1831            chain_id: Some(1),
1832            nonce: 0,
1833            gas_price: 21_000,
1834            gas_limit: 21_000,
1835            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1836            value: alloy_primitives::U256::ZERO,
1837            input: alloy_primitives::Bytes::new(),
1838        };
1839        let signature = alloy_primitives::Signature::test_signature();
1840        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1841
1842        let mock_log = alloy_primitives::Log {
1843            address: alloy_primitives::Address::ZERO,
1844            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1845        };
1846
1847        let receipt = reth_ethereum_primitives::Receipt {
1848            tx_type: TxType::Legacy,
1849            cumulative_gas_used: 21_000,
1850            logs: vec![mock_log],
1851            success: true,
1852        };
1853
1854        let mut prev_hash = alloy_primitives::B256::default();
1855        for (idx, block_number) in (100u64..=102).enumerate() {
1856            let header = alloy_consensus::Header {
1857                number: block_number,
1858                parent_hash: prev_hash,
1859                logs_bloom: alloy_primitives::Bloom::from([1u8; 256]),
1860                ..Default::default()
1861            };
1862            let hash = header.hash_slow();
1863            prev_hash = hash;
1864
1865            let block = reth_ethereum_primitives::Block {
1866                header,
1867                body: reth_ethereum_primitives::BlockBody {
1868                    transactions: vec![tx.clone()],
1869                    ..Default::default()
1870                },
1871            };
1872            provider.add_block(hash, block);
1873            provider.add_receipts(block_number, vec![receipt.clone()]);
1874            provider.add_block_body_indices(
1875                block_number,
1876                StoredBlockBodyIndices { first_tx_num: idx as u64, tx_count: 1 },
1877            );
1878        }
1879
1880        let eth_api = build_test_eth_api(provider);
1881        let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1882        let err = eth_filter
1883            .inner
1884            .clone()
1885            .get_logs_in_block_range(
1886                Filter::default(),
1887                100,
1888                102,
1889                QueryLimits { max_blocks_per_filter: None, max_logs_per_response: Some(2) },
1890            )
1891            .await
1892            .expect_err("range should exceed max logs");
1893
1894        let EthFilterError::QueryExceedsMaxResults { max_logs, from_block, to_block } = err else {
1895            panic!("unexpected error: {err:?}");
1896        };
1897
1898        assert_eq!(max_logs, 2);
1899        assert_eq!(from_block, 100);
1900        assert_eq!(to_block, 101);
1901    }
1902
1903    #[tokio::test]
1904    async fn test_non_consecutive_headers_after_bloom_filter() {
1905        let provider = MockEthProvider::default();
1906
1907        // Create 4 headers where only blocks 100 and 102 will match bloom filter
1908        let mut expected_hashes = vec![];
1909        let mut prev_hash = alloy_primitives::B256::default();
1910
1911        // Create a transaction for blocks that will have receipts
1912        use alloy_consensus::TxLegacy;
1913        use reth_ethereum_primitives::{TransactionSigned, TxType};
1914
1915        let tx_inner = TxLegacy {
1916            chain_id: Some(1),
1917            nonce: 0,
1918            gas_price: 21_000,
1919            gas_limit: 21_000,
1920            to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1921            value: alloy_primitives::U256::ZERO,
1922            input: alloy_primitives::Bytes::new(),
1923        };
1924        let signature = alloy_primitives::Signature::test_signature();
1925        let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1926
1927        for i in 100u64..=103 {
1928            let header = alloy_consensus::Header {
1929                number: i,
1930                parent_hash: prev_hash,
1931                // Set bloom to match filter only for blocks 100 and 102
1932                logs_bloom: if i == 100 || i == 102 {
1933                    alloy_primitives::Bloom::from([1u8; 256])
1934                } else {
1935                    alloy_primitives::Bloom::default()
1936                },
1937                ..Default::default()
1938            };
1939
1940            let hash = header.hash_slow();
1941            expected_hashes.push(hash);
1942            prev_hash = hash;
1943
1944            // Add transaction to blocks that will have receipts (100 and 102)
1945            let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1946
1947            let block = reth_ethereum_primitives::Block {
1948                header,
1949                body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1950            };
1951            provider.add_block(hash, block);
1952        }
1953
1954        // Add receipts with logs only to blocks that match bloom
1955        let mock_log = alloy_primitives::Log {
1956            address: alloy_primitives::Address::ZERO,
1957            data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1958        };
1959
1960        let receipt = reth_ethereum_primitives::Receipt {
1961            tx_type: TxType::Legacy,
1962            cumulative_gas_used: 21_000,
1963            logs: vec![mock_log],
1964            success: true,
1965        };
1966
1967        provider.add_receipts(100, vec![receipt.clone()]);
1968        provider.add_receipts(101, vec![]);
1969        provider.add_receipts(102, vec![receipt.clone()]);
1970        provider.add_receipts(103, vec![]);
1971
1972        // Add block body indices for each block so receipts can be fetched
1973        use reth_db_api::models::StoredBlockBodyIndices;
1974        provider
1975            .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1976        provider
1977            .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1978        provider
1979            .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1980        provider
1981            .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1982
1983        let eth_api = build_test_eth_api(provider);
1984        let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1985
1986        // Use default filter which will match any non-empty bloom
1987        let filter = Filter::default();
1988
1989        // Get logs in the range - this will trigger the bloom filtering
1990        let logs = eth_filter
1991            .inner
1992            .clone()
1993            .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1994            .await
1995            .expect("should succeed");
1996
1997        // We should get logs from blocks 100 and 102 only (bloom filtered)
1998        assert_eq!(logs.len(), 2);
1999
2000        assert_eq!(logs[0].block_number, Some(100));
2001        assert_eq!(logs[1].block_number, Some(102));
2002
2003        // Each block hash should be the hash of its own header, not derived from any other header
2004        assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); // block 100
2005        assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); // block 102
2006    }
2007}