1use alloy_consensus::BlockHeader;
4use alloy_primitives::{Sealable, TxHash};
5use alloy_rpc_types_eth::{
6 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7 PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::{
11 future::TryFutureExt,
12 stream::{FuturesOrdered, StreamExt},
13 Future,
14};
15use itertools::Itertools;
16use jsonrpsee::{core::RpcResult, server::IdProvider};
17use reth_errors::ProviderError;
18use reth_primitives_traits::{NodePrimitives, SealedHeader};
19use reth_rpc_eth_api::{
20 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
21 RpcNodeCoreExt, RpcTransaction,
22};
23use reth_rpc_eth_types::{
24 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
25 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
26};
27use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
28use reth_storage_api::{
29 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
30 ProviderReceipt, ReceiptProvider,
31};
32use reth_tasks::TaskSpawner;
33use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
34use std::{
35 collections::{HashMap, VecDeque},
36 fmt,
37 iter::{Peekable, StepBy},
38 ops::RangeInclusive,
39 pin::Pin,
40 sync::Arc,
41 time::{Duration, Instant},
42};
43use tokio::{
44 sync::{mpsc::Receiver, oneshot, Mutex},
45 time::MissedTickBehavior,
46};
47use tracing::{debug, error, trace};
48
49impl<Eth> EngineEthFilter for EthFilter<Eth>
50where
51 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
52{
53 fn logs(
55 &self,
56 filter: Filter,
57 limits: QueryLimits,
58 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
59 trace!(target: "rpc::eth", "Serving eth_getLogs");
60 self.logs_for_filter(filter, limits).map_err(|e| e.into())
61 }
62}
63
64const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
66
67const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
69
70const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
72
73const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
75
76const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
81
82const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
84
85pub struct EthFilter<Eth: EthApiTypes> {
89 inner: Arc<EthFilterInner<Eth>>,
91}
92
93impl<Eth> Clone for EthFilter<Eth>
94where
95 Eth: EthApiTypes,
96{
97 fn clone(&self) -> Self {
98 Self { inner: self.inner.clone() }
99 }
100}
101
102impl<Eth> EthFilter<Eth>
103where
104 Eth: EthApiTypes + 'static,
105{
106 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
134 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
135 config;
136 let inner = EthFilterInner {
137 eth_api,
138 active_filters: ActiveFilters::new(),
139 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
140 max_headers_range: MAX_HEADERS_RANGE,
141 task_spawner,
142 stale_filter_ttl,
143 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
144 };
145
146 let eth_filter = Self { inner: Arc::new(inner) };
147
148 let this = eth_filter.clone();
149 eth_filter.inner.task_spawner.spawn_critical(
150 "eth-filters_stale-filters-clean",
151 Box::pin(async move {
152 this.watch_and_clear_stale_filters().await;
153 }),
154 );
155
156 eth_filter
157 }
158
159 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
161 &self.inner.active_filters
162 }
163
164 async fn watch_and_clear_stale_filters(&self) {
167 let mut interval = tokio::time::interval_at(
168 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
169 self.inner.stale_filter_ttl,
170 );
171 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
172 loop {
173 interval.tick().await;
174 self.clear_stale_filters(Instant::now()).await;
175 }
176 }
177
178 pub async fn clear_stale_filters(&self, now: Instant) {
181 trace!(target: "rpc::eth", "clear stale filters");
182 self.active_filters().inner.lock().await.retain(|id, filter| {
183 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
184
185 if !is_valid {
186 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
187 }
188
189 is_valid
190 })
191 }
192}
193
194impl<Eth> EthFilter<Eth>
195where
196 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
197{
198 fn provider(&self) -> &Eth::Provider {
200 self.inner.eth_api.provider()
201 }
202
203 fn pool(&self) -> &Eth::Pool {
205 self.inner.eth_api.pool()
206 }
207
208 pub async fn filter_changes(
210 &self,
211 id: FilterId,
212 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
213 let info = self.provider().chain_info()?;
214 let best_number = info.best_number;
215
216 let (start_block, kind) = {
219 let mut filters = self.inner.active_filters.inner.lock().await;
220 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
221
222 if filter.block > best_number {
223 return Ok(FilterChanges::Empty)
225 }
226
227 let mut block = best_number + 1;
231 std::mem::swap(&mut filter.block, &mut block);
232 filter.last_poll_timestamp = Instant::now();
233
234 (block, filter.kind.clone())
235 };
236
237 match kind {
238 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
239 FilterKind::Block => {
240 let end_block = best_number + 1;
243 let block_hashes =
244 self.provider().canonical_hashes_range(start_block, end_block).map_err(
245 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
246 )?;
247 Ok(FilterChanges::Hashes(block_hashes))
248 }
249 FilterKind::Log(filter) => {
250 let (from_block_number, to_block_number) = match filter.block_option {
251 FilterBlockOption::Range { from_block, to_block } => {
252 let from = from_block
253 .map(|num| self.provider().convert_block_number(num))
254 .transpose()?
255 .flatten();
256 let to = to_block
257 .map(|num| self.provider().convert_block_number(num))
258 .transpose()?
259 .flatten();
260 logs_utils::get_filter_block_range(from, to, start_block, info)
261 }
262 FilterBlockOption::AtBlockHash(_) => {
263 (start_block, best_number)
267 }
268 };
269 let logs = self
270 .inner
271 .clone()
272 .get_logs_in_block_range(
273 *filter,
274 from_block_number,
275 to_block_number,
276 self.inner.query_limits,
277 )
278 .await?;
279 Ok(FilterChanges::Logs(logs))
280 }
281 }
282 }
283
284 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
290 let filter = {
291 let filters = self.inner.active_filters.inner.lock().await;
292 if let FilterKind::Log(ref filter) =
293 filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
294 {
295 *filter.clone()
296 } else {
297 return Err(EthFilterError::FilterNotFound(id))
299 }
300 };
301
302 self.logs_for_filter(filter, self.inner.query_limits).await
303 }
304
305 async fn logs_for_filter(
307 &self,
308 filter: Filter,
309 limits: QueryLimits,
310 ) -> Result<Vec<Log>, EthFilterError> {
311 self.inner.clone().logs_for_filter(filter, limits).await
312 }
313}
314
315#[async_trait]
316impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
317where
318 Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
319{
320 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
322 trace!(target: "rpc::eth", "Serving eth_newFilter");
323 self.inner
324 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
325 .await
326 }
327
328 async fn new_block_filter(&self) -> RpcResult<FilterId> {
330 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
331 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
332 }
333
334 async fn new_pending_transaction_filter(
336 &self,
337 kind: Option<PendingTransactionFilterKind>,
338 ) -> RpcResult<FilterId> {
339 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
340
341 let transaction_kind = match kind.unwrap_or_default() {
342 PendingTransactionFilterKind::Hashes => {
343 let receiver = self.pool().pending_transactions_listener();
344 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
345 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
346 }
347 PendingTransactionFilterKind::Full => {
348 let stream = self.pool().new_pending_pool_transactions_listener();
349 let full_txs_receiver = FullTransactionsReceiver::new(
350 stream,
351 self.inner.eth_api.tx_resp_builder().clone(),
352 );
353 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
354 full_txs_receiver,
355 )))
356 }
357 };
358
359 self.inner.install_filter(transaction_kind).await
363 }
364
365 async fn filter_changes(
367 &self,
368 id: FilterId,
369 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
370 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
371 Ok(Self::filter_changes(self, id).await?)
372 }
373
374 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
380 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
381 Ok(Self::filter_logs(self, id).await?)
382 }
383
384 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
386 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
387 let mut filters = self.inner.active_filters.inner.lock().await;
388 if filters.remove(&id).is_some() {
389 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
390 Ok(true)
391 } else {
392 Ok(false)
393 }
394 }
395
396 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
400 trace!(target: "rpc::eth", "Serving eth_getLogs");
401 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
402 }
403}
404
405impl<Eth> std::fmt::Debug for EthFilter<Eth>
406where
407 Eth: EthApiTypes,
408{
409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410 f.debug_struct("EthFilter").finish_non_exhaustive()
411 }
412}
413
414#[derive(Debug)]
416struct EthFilterInner<Eth: EthApiTypes> {
417 eth_api: Eth,
419 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
421 id_provider: Arc<dyn IdProvider>,
423 query_limits: QueryLimits,
425 max_headers_range: u64,
427 task_spawner: Box<dyn TaskSpawner>,
429 stale_filter_ttl: Duration,
431}
432
433impl<Eth> EthFilterInner<Eth>
434where
435 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
436 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
437 + 'static,
438{
439 fn provider(&self) -> &Eth::Provider {
441 self.eth_api.provider()
442 }
443
444 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
446 self.eth_api.cache()
447 }
448
449 async fn logs_for_filter(
451 self: Arc<Self>,
452 filter: Filter,
453 limits: QueryLimits,
454 ) -> Result<Vec<Log>, EthFilterError> {
455 match filter.block_option {
456 FilterBlockOption::AtBlockHash(block_hash) => {
457 let header = self
460 .provider()
461 .header_by_hash_or_number(block_hash.into())?
462 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
463
464 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
465
466 let (receipts, maybe_block) = self
469 .eth_cache()
470 .get_receipts_and_maybe_block(block_num_hash.hash)
471 .await?
472 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
473
474 let mut all_logs = Vec::new();
475 append_matching_block_logs(
476 &mut all_logs,
477 maybe_block
478 .map(ProviderOrBlock::Block)
479 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
480 &filter,
481 block_num_hash,
482 &receipts,
483 false,
484 header.timestamp(),
485 )?;
486
487 Ok(all_logs)
488 }
489 FilterBlockOption::Range { from_block, to_block } => {
490 let info = self.provider().chain_info()?;
492
493 let start_block = info.best_number;
495 let from = from_block
496 .map(|num| self.provider().convert_block_number(num))
497 .transpose()?
498 .flatten();
499 let to = to_block
500 .map(|num| self.provider().convert_block_number(num))
501 .transpose()?
502 .flatten();
503 let (from_block_number, to_block_number) =
504 logs_utils::get_filter_block_range(from, to, start_block, info);
505 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
506 .await
507 }
508 }
509 }
510
511 async fn install_filter(
513 &self,
514 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
515 ) -> RpcResult<FilterId> {
516 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
517 let subscription_id = self.id_provider.next_id();
518
519 let id = match subscription_id {
520 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
521 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
522 };
523 let mut filters = self.active_filters.inner.lock().await;
524 filters.insert(
525 id.clone(),
526 ActiveFilter {
527 block: last_poll_block_number,
528 last_poll_timestamp: Instant::now(),
529 kind,
530 },
531 );
532 Ok(id)
533 }
534
535 async fn get_logs_in_block_range(
541 self: Arc<Self>,
542 filter: Filter,
543 from_block: u64,
544 to_block: u64,
545 limits: QueryLimits,
546 ) -> Result<Vec<Log>, EthFilterError> {
547 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
548
549 if to_block < from_block {
551 return Err(EthFilterError::InvalidBlockRangeParams)
552 }
553
554 if let Some(max_blocks_per_filter) =
555 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
556 {
557 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
558 }
559
560 let (tx, rx) = oneshot::channel();
561 let this = self.clone();
562 self.task_spawner.spawn_blocking(Box::pin(async move {
563 let res =
564 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
565 let _ = tx.send(res);
566 }));
567
568 rx.await.map_err(|_| EthFilterError::InternalError)?
569 }
570
571 async fn get_logs_in_block_range_inner(
580 self: Arc<Self>,
581 filter: &Filter,
582 from_block: u64,
583 to_block: u64,
584 limits: QueryLimits,
585 ) -> Result<Vec<Log>, EthFilterError> {
586 let mut all_logs = Vec::new();
587 let mut matching_headers = Vec::new();
588
589 let chain_tip = self.provider().best_block_number()?;
591
592 for (from, to) in
594 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
595 {
596 let headers = self.provider().headers_range(from..=to)?;
597
598 let mut headers_iter = headers.into_iter().peekable();
599
600 while let Some(header) = headers_iter.next() {
601 if !filter.matches_bloom(header.logs_bloom()) {
602 continue
603 }
604
605 let current_number = header.number();
606
607 let block_hash = match headers_iter.peek() {
608 Some(next_header) if next_header.number() == current_number + 1 => {
609 next_header.parent_hash()
611 }
612 _ => {
613 header.hash_slow()
615 }
616 };
617
618 matching_headers.push(SealedHeader::new(header, block_hash));
619 }
620 }
621
622 let mut range_mode = RangeMode::new(
624 self.clone(),
625 matching_headers,
626 from_block,
627 to_block,
628 self.max_headers_range,
629 chain_tip,
630 );
631
632 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
634 range_mode.next().await?
635 {
636 let num_hash = header.num_hash();
637 append_matching_block_logs(
638 &mut all_logs,
639 recovered_block
640 .map(ProviderOrBlock::Block)
641 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
642 filter,
643 num_hash,
644 &receipts,
645 false,
646 header.timestamp(),
647 )?;
648
649 let is_multi_block_range = from_block != to_block;
652 if let Some(max_logs_per_response) = limits.max_logs_per_response {
653 if is_multi_block_range && all_logs.len() > max_logs_per_response {
654 debug!(
655 target: "rpc::eth::filter",
656 logs_found = all_logs.len(),
657 max_logs_per_response,
658 from_block,
659 to_block = num_hash.number.saturating_sub(1),
660 "Query exceeded max logs per response limit"
661 );
662 return Err(EthFilterError::QueryExceedsMaxResults {
663 max_logs: max_logs_per_response,
664 from_block,
665 to_block: num_hash.number.saturating_sub(1),
666 });
667 }
668 }
669 }
670
671 Ok(all_logs)
672 }
673}
674
675#[derive(Debug, Clone, Default)]
677pub struct ActiveFilters<T> {
678 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
679}
680
681impl<T> ActiveFilters<T> {
682 pub fn new() -> Self {
684 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
685 }
686}
687
688#[derive(Debug)]
690struct ActiveFilter<T> {
691 block: u64,
693 last_poll_timestamp: Instant,
695 kind: FilterKind<T>,
697}
698
699#[derive(Debug, Clone)]
701struct PendingTransactionsReceiver {
702 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
703}
704
705impl PendingTransactionsReceiver {
706 fn new(receiver: Receiver<TxHash>) -> Self {
707 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
708 }
709
710 async fn drain<T>(&self) -> FilterChanges<T> {
712 let mut pending_txs = Vec::new();
713 let mut prepared_stream = self.txs_receiver.lock().await;
714
715 while let Ok(tx_hash) = prepared_stream.try_recv() {
716 pending_txs.push(tx_hash);
717 }
718
719 FilterChanges::Hashes(pending_txs)
721 }
722}
723
724#[derive(Debug, Clone)]
726struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
727 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
728 tx_resp_builder: TxCompat,
729}
730
731impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
732where
733 T: PoolTransaction + 'static,
734 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
735{
736 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
738 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
739 }
740
741 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
743 let mut pending_txs = Vec::new();
744 let mut prepared_stream = self.txs_stream.lock().await;
745
746 while let Ok(tx) = prepared_stream.try_recv() {
747 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
748 Ok(tx) => pending_txs.push(tx),
749 Err(err) => {
750 error!(target: "rpc",
751 %err,
752 "Failed to fill txn with block context"
753 );
754 }
755 }
756 }
757 FilterChanges::Transactions(pending_txs)
758 }
759}
760
761#[async_trait]
763trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
764 async fn drain(&self) -> FilterChanges<T>;
765}
766
767#[async_trait]
768impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
769 for FullTransactionsReceiver<T, TxCompat>
770where
771 T: PoolTransaction + 'static,
772 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
773{
774 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
775 Self::drain(self).await
776 }
777}
778
779#[derive(Debug, Clone)]
785enum PendingTransactionKind<T> {
786 Hashes(PendingTransactionsReceiver),
787 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
788}
789
790impl<T: 'static> PendingTransactionKind<T> {
791 async fn drain(&self) -> FilterChanges<T> {
792 match self {
793 Self::Hashes(receiver) => receiver.drain().await,
794 Self::FullTransaction(receiver) => receiver.drain().await,
795 }
796 }
797}
798
799#[derive(Clone, Debug)]
800enum FilterKind<T> {
801 Log(Box<Filter>),
802 Block,
803 PendingTransaction(PendingTransactionKind<T>),
804}
805
806#[derive(Debug)]
808struct BlockRangeInclusiveIter {
809 iter: StepBy<RangeInclusive<u64>>,
810 step: u64,
811 end: u64,
812}
813
814impl BlockRangeInclusiveIter {
815 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
816 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
817 }
818}
819
820impl Iterator for BlockRangeInclusiveIter {
821 type Item = (u64, u64);
822
823 fn next(&mut self) -> Option<Self::Item> {
824 let start = self.iter.next()?;
825 let end = (start + self.step).min(self.end);
826 if start > end {
827 return None
828 }
829 Some((start, end))
830 }
831}
832
833#[derive(Debug, thiserror::Error)]
835pub enum EthFilterError {
836 #[error("filter not found")]
838 FilterNotFound(FilterId),
839 #[error("invalid block range params")]
841 InvalidBlockRangeParams,
842 #[error("query exceeds max block range {0}")]
844 QueryExceedsMaxBlocks(u64),
845 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
847 QueryExceedsMaxResults {
848 max_logs: usize,
850 from_block: u64,
852 to_block: u64,
854 },
855 #[error(transparent)]
857 EthAPIError(#[from] EthApiError),
858 #[error("internal filter error")]
860 InternalError,
861}
862
863impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
864 fn from(err: EthFilterError) -> Self {
865 match err {
866 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
867 jsonrpsee::types::error::INVALID_PARAMS_CODE,
868 "filter not found",
869 ),
870 err @ EthFilterError::InternalError => {
871 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
872 }
873 EthFilterError::EthAPIError(err) => err.into(),
874 err @ (EthFilterError::InvalidBlockRangeParams |
875 EthFilterError::QueryExceedsMaxBlocks(_) |
876 EthFilterError::QueryExceedsMaxResults { .. }) => {
877 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
878 }
879 }
880 }
881}
882
883impl From<ProviderError> for EthFilterError {
884 fn from(err: ProviderError) -> Self {
885 Self::EthAPIError(err.into())
886 }
887}
888
889struct ReceiptBlockResult<P>
892where
893 P: ReceiptProvider + BlockReader,
894{
895 receipts: Arc<Vec<ProviderReceipt<P>>>,
897 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
899 header: SealedHeader<<P as HeaderProvider>::Header>,
901}
902
903enum RangeMode<
905 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
906> {
907 Cached(CachedMode<Eth>),
909 Range(RangeBlockMode<Eth>),
911}
912
913impl<
914 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
915 > RangeMode<Eth>
916{
917 fn new(
919 filter_inner: Arc<EthFilterInner<Eth>>,
920 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
921 from_block: u64,
922 to_block: u64,
923 max_headers_range: u64,
924 chain_tip: u64,
925 ) -> Self {
926 let block_count = to_block - from_block + 1;
927 let distance_from_tip = chain_tip.saturating_sub(to_block);
928
929 let use_cached_mode =
931 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
932
933 if use_cached_mode && !sealed_headers.is_empty() {
934 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
935 } else {
936 Self::Range(RangeBlockMode {
937 filter_inner,
938 iter: sealed_headers.into_iter().peekable(),
939 next: VecDeque::new(),
940 max_range: max_headers_range as usize,
941 pending_tasks: FuturesOrdered::new(),
942 })
943 }
944 }
945
946 const fn should_use_cached_mode(
948 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
949 block_count: u64,
950 distance_from_tip: u64,
951 ) -> bool {
952 let bloom_matches = headers.len();
954
955 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
957
958 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
959 }
960
961 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
963 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
965 return CACHED_MODE_BLOCK_THRESHOLD;
966 }
967
968 match bloom_matches {
969 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
970 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
971 _ => CACHED_MODE_BLOCK_THRESHOLD,
972 }
973 }
974
975 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
977 match self {
978 Self::Cached(cached) => cached.next().await,
979 Self::Range(range) => range.next().await,
980 }
981 }
982}
983
984struct CachedMode<
986 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
987> {
988 filter_inner: Arc<EthFilterInner<Eth>>,
989 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
990}
991
992impl<
993 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
994 > CachedMode<Eth>
995{
996 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
997 for header in self.headers_iter.by_ref() {
998 if let Some((receipts, maybe_block)) =
1000 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1001 {
1002 return Ok(Some(ReceiptBlockResult {
1003 receipts,
1004 recovered_block: maybe_block,
1005 header,
1006 }));
1007 }
1008 }
1009
1010 Ok(None) }
1012}
1013
1014type ReceiptFetchFuture<P> =
1016 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1017
1018struct RangeBlockMode<
1020 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1021> {
1022 filter_inner: Arc<EthFilterInner<Eth>>,
1023 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1024 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1025 max_range: usize,
1026 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1028}
1029
1030impl<
1031 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1032 > RangeBlockMode<Eth>
1033{
1034 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1035 loop {
1036 if let Some(result) = self.next.pop_front() {
1038 return Ok(Some(result));
1039 }
1040
1041 if let Some(task_result) = self.pending_tasks.next().await {
1043 self.next.extend(task_result?);
1044 continue;
1045 }
1046
1047 let Some(next_header) = self.iter.next() else {
1049 return Ok(None);
1051 };
1052
1053 let mut range_headers = Vec::with_capacity(self.max_range);
1054 range_headers.push(next_header);
1055
1056 while range_headers.len() < self.max_range {
1058 let Some(peeked) = self.iter.peek() else { break };
1059 let Some(last_header) = range_headers.last() else { break };
1060
1061 let expected_next = last_header.number() + 1;
1062 if peeked.number() != expected_next {
1063 debug!(
1064 target: "rpc::eth::filter",
1065 last_block = last_header.number(),
1066 next_block = peeked.number(),
1067 expected = expected_next,
1068 range_size = range_headers.len(),
1069 "Non-consecutive block detected, stopping range collection"
1070 );
1071 break; }
1073
1074 let Some(next_header) = self.iter.next() else { break };
1075 range_headers.push(next_header);
1076 }
1077
1078 let remaining_headers = self.iter.len() + range_headers.len();
1080 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1081 self.spawn_parallel_tasks(range_headers);
1082 } else {
1084 if let Some(result) = self.process_small_range(range_headers).await? {
1086 return Ok(Some(result));
1087 }
1088 }
1090 }
1091 }
1092
1093 async fn process_small_range(
1097 &mut self,
1098 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1099 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1100 for header in range_headers {
1102 let (maybe_block, maybe_receipts) = self
1104 .filter_inner
1105 .eth_cache()
1106 .maybe_cached_block_and_receipts(header.hash())
1107 .await?;
1108
1109 let receipts = match maybe_receipts {
1110 Some(receipts) => receipts,
1111 None => {
1112 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1114 Some(receipts) => Arc::new(receipts),
1115 None => continue, }
1117 }
1118 };
1119
1120 if !receipts.is_empty() {
1121 self.next.push_back(ReceiptBlockResult {
1122 receipts,
1123 recovered_block: maybe_block,
1124 header,
1125 });
1126 }
1127 }
1128
1129 Ok(self.next.pop_front())
1130 }
1131
1132 fn spawn_parallel_tasks(
1137 &mut self,
1138 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1139 ) {
1140 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1142 let header_chunks = range_headers
1143 .into_iter()
1144 .chunks(chunk_size)
1145 .into_iter()
1146 .map(|chunk| chunk.collect::<Vec<_>>())
1147 .collect::<Vec<_>>();
1148
1149 for chunk_headers in header_chunks {
1151 let filter_inner = self.filter_inner.clone();
1152 let chunk_task = Box::pin(async move {
1153 let chunk_task = tokio::task::spawn_blocking(move || {
1154 let mut chunk_results = Vec::new();
1155
1156 for header in chunk_headers {
1157 let receipts = match filter_inner
1160 .provider()
1161 .receipts_by_block(header.hash().into())?
1162 {
1163 Some(receipts) => Arc::new(receipts),
1164 None => continue, };
1166
1167 if !receipts.is_empty() {
1168 chunk_results.push(ReceiptBlockResult {
1169 receipts,
1170 recovered_block: None,
1171 header,
1172 });
1173 }
1174 }
1175
1176 Ok(chunk_results)
1177 });
1178
1179 match chunk_task.await {
1181 Ok(Ok(chunk_results)) => Ok(chunk_results),
1182 Ok(Err(e)) => Err(e),
1183 Err(join_err) => {
1184 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1185 Err(EthFilterError::InternalError)
1186 }
1187 }
1188 });
1189
1190 self.pending_tasks.push_back(chunk_task);
1191 }
1192 }
1193}
1194
1195#[cfg(test)]
1196mod tests {
1197 use super::*;
1198 use crate::{eth::EthApi, EthApiBuilder};
1199 use alloy_network::Ethereum;
1200 use alloy_primitives::FixedBytes;
1201 use rand::Rng;
1202 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1203 use reth_ethereum_primitives::TxType;
1204 use reth_evm_ethereum::EthEvmConfig;
1205 use reth_network_api::noop::NoopNetwork;
1206 use reth_provider::test_utils::MockEthProvider;
1207 use reth_rpc_convert::RpcConverter;
1208 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1209 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1210 use reth_tasks::TokioTaskExecutor;
1211 use reth_testing_utils::generators;
1212 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1213 use std::{collections::VecDeque, sync::Arc};
1214
1215 #[test]
1216 fn test_block_range_iter() {
1217 let mut rng = generators::rng();
1218
1219 let start = rng.random::<u32>() as u64;
1220 let end = start.saturating_add(rng.random::<u32>() as u64);
1221 let step = rng.random::<u16>() as u64;
1222 let range = start..=end;
1223 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1224 let (from, mut end) = iter.next().unwrap();
1225 assert_eq!(from, start);
1226 assert_eq!(end, (from + step).min(*range.end()));
1227
1228 for (next_from, next_end) in iter {
1229 assert_eq!(next_from, end + 1);
1231 end = next_end;
1232 }
1233
1234 assert_eq!(end, *range.end());
1235 }
1236
1237 #[expect(clippy::type_complexity)]
1239 fn build_test_eth_api(
1240 provider: MockEthProvider,
1241 ) -> EthApi<
1242 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1243 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1244 > {
1245 EthApiBuilder::new(
1246 provider.clone(),
1247 testing_pool(),
1248 NoopNetwork::default(),
1249 EthEvmConfig::new(provider.chain_spec()),
1250 )
1251 .build()
1252 }
1253
1254 #[tokio::test]
1255 async fn test_range_block_mode_empty_range() {
1256 let provider = MockEthProvider::default();
1257 let eth_api = build_test_eth_api(provider);
1258
1259 let eth_filter = super::EthFilter::new(
1260 eth_api,
1261 EthFilterConfig::default(),
1262 Box::new(TokioTaskExecutor::default()),
1263 );
1264 let filter_inner = eth_filter.inner;
1265
1266 let headers = vec![];
1267 let max_range = 100;
1268
1269 let mut range_mode = RangeBlockMode {
1270 filter_inner,
1271 iter: headers.into_iter().peekable(),
1272 next: VecDeque::new(),
1273 max_range,
1274 pending_tasks: FuturesOrdered::new(),
1275 };
1276
1277 let result = range_mode.next().await;
1278 assert!(result.is_ok());
1279 assert!(result.unwrap().is_none());
1280 }
1281
1282 #[tokio::test]
1283 async fn test_range_block_mode_queued_results_priority() {
1284 let provider = MockEthProvider::default();
1285 let eth_api = build_test_eth_api(provider);
1286
1287 let eth_filter = super::EthFilter::new(
1288 eth_api,
1289 EthFilterConfig::default(),
1290 Box::new(TokioTaskExecutor::default()),
1291 );
1292 let filter_inner = eth_filter.inner;
1293
1294 let headers = vec![
1295 SealedHeader::new(
1296 alloy_consensus::Header { number: 100, ..Default::default() },
1297 FixedBytes::random(),
1298 ),
1299 SealedHeader::new(
1300 alloy_consensus::Header { number: 101, ..Default::default() },
1301 FixedBytes::random(),
1302 ),
1303 ];
1304
1305 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1307 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1308
1309 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1311 tx_type: TxType::Legacy,
1312 cumulative_gas_used: 100_000,
1313 logs: vec![],
1314 success: true,
1315 };
1316 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1317 tx_type: TxType::Eip1559,
1318 cumulative_gas_used: 200_000,
1319 logs: vec![],
1320 success: true,
1321 };
1322 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1323 tx_type: TxType::Eip2930,
1324 cumulative_gas_used: 150_000,
1325 logs: vec![],
1326 success: false, };
1328
1329 let mock_result_1 = ReceiptBlockResult {
1330 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1331 recovered_block: None,
1332 header: SealedHeader::new(
1333 alloy_consensus::Header { number: 42, ..Default::default() },
1334 expected_block_hash_1,
1335 ),
1336 };
1337
1338 let mock_result_2 = ReceiptBlockResult {
1339 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1340 recovered_block: None,
1341 header: SealedHeader::new(
1342 alloy_consensus::Header { number: 43, ..Default::default() },
1343 expected_block_hash_2,
1344 ),
1345 };
1346
1347 let mut range_mode = RangeBlockMode {
1348 filter_inner,
1349 iter: headers.into_iter().peekable(),
1350 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1352 pending_tasks: FuturesOrdered::new(),
1353 };
1354
1355 let result1 = range_mode.next().await;
1357 assert!(result1.is_ok());
1358 let receipt_result1 = result1.unwrap().unwrap();
1359 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1360 assert_eq!(receipt_result1.header.number, 42);
1361
1362 assert_eq!(receipt_result1.receipts.len(), 2);
1364 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1365 assert_eq!(
1366 receipt_result1.receipts[0].cumulative_gas_used,
1367 mock_receipt_1.cumulative_gas_used
1368 );
1369 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1370 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1371 assert_eq!(
1372 receipt_result1.receipts[1].cumulative_gas_used,
1373 mock_receipt_2.cumulative_gas_used
1374 );
1375 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1376
1377 let result2 = range_mode.next().await;
1379 assert!(result2.is_ok());
1380 let receipt_result2 = result2.unwrap().unwrap();
1381 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1382 assert_eq!(receipt_result2.header.number, 43);
1383
1384 assert_eq!(receipt_result2.receipts.len(), 1);
1386 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1387 assert_eq!(
1388 receipt_result2.receipts[0].cumulative_gas_used,
1389 mock_receipt_3.cumulative_gas_used
1390 );
1391 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1392
1393 assert!(range_mode.next.is_empty());
1395
1396 let result3 = range_mode.next().await;
1397 assert!(result3.is_ok());
1398 }
1399
1400 #[tokio::test]
1401 async fn test_range_block_mode_single_block_no_receipts() {
1402 let provider = MockEthProvider::default();
1403 let eth_api = build_test_eth_api(provider);
1404
1405 let eth_filter = super::EthFilter::new(
1406 eth_api,
1407 EthFilterConfig::default(),
1408 Box::new(TokioTaskExecutor::default()),
1409 );
1410 let filter_inner = eth_filter.inner;
1411
1412 let headers = vec![SealedHeader::new(
1413 alloy_consensus::Header { number: 100, ..Default::default() },
1414 FixedBytes::random(),
1415 )];
1416
1417 let mut range_mode = RangeBlockMode {
1418 filter_inner,
1419 iter: headers.into_iter().peekable(),
1420 next: VecDeque::new(),
1421 max_range: 100,
1422 pending_tasks: FuturesOrdered::new(),
1423 };
1424
1425 let result = range_mode.next().await;
1426 assert!(result.is_ok());
1427 }
1428
1429 #[tokio::test]
1430 async fn test_range_block_mode_provider_receipts() {
1431 let provider = MockEthProvider::default();
1432
1433 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1434 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1435 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1436
1437 let block_hash_1 = FixedBytes::random();
1438 let block_hash_2 = FixedBytes::random();
1439 let block_hash_3 = FixedBytes::random();
1440
1441 provider.add_header(block_hash_1, header_1.clone());
1442 provider.add_header(block_hash_2, header_2.clone());
1443 provider.add_header(block_hash_3, header_3.clone());
1444
1445 let mock_log = alloy_primitives::Log {
1447 address: alloy_primitives::Address::ZERO,
1448 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1449 };
1450
1451 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1452 tx_type: TxType::Legacy,
1453 cumulative_gas_used: 21_000,
1454 logs: vec![mock_log.clone()],
1455 success: true,
1456 };
1457 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1458 tx_type: TxType::Eip1559,
1459 cumulative_gas_used: 42_000,
1460 logs: vec![mock_log.clone()],
1461 success: true,
1462 };
1463 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1464 tx_type: TxType::Eip2930,
1465 cumulative_gas_used: 30_000,
1466 logs: vec![mock_log.clone()],
1467 success: false,
1468 };
1469
1470 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1471 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1472
1473 let eth_api = build_test_eth_api(provider);
1474
1475 let eth_filter = super::EthFilter::new(
1476 eth_api,
1477 EthFilterConfig::default(),
1478 Box::new(TokioTaskExecutor::default()),
1479 );
1480 let filter_inner = eth_filter.inner;
1481
1482 let headers = vec![
1483 SealedHeader::new(header_1, block_hash_1),
1484 SealedHeader::new(header_2, block_hash_2),
1485 SealedHeader::new(header_3, block_hash_3),
1486 ];
1487
1488 let mut range_mode = RangeBlockMode {
1489 filter_inner,
1490 iter: headers.into_iter().peekable(),
1491 next: VecDeque::new(),
1492 max_range: 3, pending_tasks: FuturesOrdered::new(),
1494 };
1495
1496 let result = range_mode.next().await;
1498 assert!(result.is_ok());
1499 let receipt_result = result.unwrap().unwrap();
1500
1501 assert_eq!(receipt_result.header.hash(), block_hash_1);
1502 assert_eq!(receipt_result.header.number, 100);
1503 assert_eq!(receipt_result.receipts.len(), 2);
1504
1505 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1507 assert_eq!(
1508 receipt_result.receipts[0].cumulative_gas_used,
1509 receipt_100_1.cumulative_gas_used
1510 );
1511 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1512
1513 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1514 assert_eq!(
1515 receipt_result.receipts[1].cumulative_gas_used,
1516 receipt_100_2.cumulative_gas_used
1517 );
1518 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1519
1520 let result2 = range_mode.next().await;
1522 assert!(result2.is_ok());
1523 let receipt_result2 = result2.unwrap().unwrap();
1524
1525 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1526 assert_eq!(receipt_result2.header.number, 101);
1527 assert_eq!(receipt_result2.receipts.len(), 1);
1528
1529 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1531 assert_eq!(
1532 receipt_result2.receipts[0].cumulative_gas_used,
1533 receipt_101_1.cumulative_gas_used
1534 );
1535 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1536
1537 let result3 = range_mode.next().await;
1539 assert!(result3.is_ok());
1540 assert!(result3.unwrap().is_none());
1541 }
1542
1543 #[tokio::test]
1544 async fn test_range_block_mode_iterator_exhaustion() {
1545 let provider = MockEthProvider::default();
1546
1547 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1548 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1549
1550 let block_hash_100 = FixedBytes::random();
1551 let block_hash_101 = FixedBytes::random();
1552
1553 provider.add_header(block_hash_100, header_100.clone());
1555 provider.add_header(block_hash_101, header_101.clone());
1556
1557 let mock_receipt = reth_ethereum_primitives::Receipt {
1559 tx_type: TxType::Legacy,
1560 cumulative_gas_used: 21_000,
1561 logs: vec![],
1562 success: true,
1563 };
1564 provider.add_receipts(100, vec![mock_receipt.clone()]);
1565 provider.add_receipts(101, vec![mock_receipt.clone()]);
1566
1567 let eth_api = build_test_eth_api(provider);
1568
1569 let eth_filter = super::EthFilter::new(
1570 eth_api,
1571 EthFilterConfig::default(),
1572 Box::new(TokioTaskExecutor::default()),
1573 );
1574 let filter_inner = eth_filter.inner;
1575
1576 let headers = vec![
1577 SealedHeader::new(header_100, block_hash_100),
1578 SealedHeader::new(header_101, block_hash_101),
1579 ];
1580
1581 let mut range_mode = RangeBlockMode {
1582 filter_inner,
1583 iter: headers.into_iter().peekable(),
1584 next: VecDeque::new(),
1585 max_range: 1,
1586 pending_tasks: FuturesOrdered::new(),
1587 };
1588
1589 let result1 = range_mode.next().await;
1590 assert!(result1.is_ok());
1591 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1596 assert!(result2.is_ok());
1597 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1601
1602 let result3 = range_mode.next().await;
1604 assert!(result3.is_ok());
1605 assert!(result3.unwrap().is_none());
1606 }
1607
1608 #[tokio::test]
1609 async fn test_cached_mode_with_mock_receipts() {
1610 let test_hash = FixedBytes::from([42u8; 32]);
1612 let test_block_number = 100u64;
1613 let test_header = SealedHeader::new(
1614 alloy_consensus::Header {
1615 number: test_block_number,
1616 gas_used: 50_000,
1617 ..Default::default()
1618 },
1619 test_hash,
1620 );
1621
1622 let mock_log = alloy_primitives::Log {
1624 address: alloy_primitives::Address::ZERO,
1625 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1626 };
1627
1628 let mock_receipt = reth_ethereum_primitives::Receipt {
1629 tx_type: TxType::Legacy,
1630 cumulative_gas_used: 21_000,
1631 logs: vec![mock_log],
1632 success: true,
1633 };
1634
1635 let provider = MockEthProvider::default();
1636 provider.add_header(test_hash, test_header.header().clone());
1637 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1638
1639 let eth_api = build_test_eth_api(provider);
1640 let eth_filter = super::EthFilter::new(
1641 eth_api,
1642 EthFilterConfig::default(),
1643 Box::new(TokioTaskExecutor::default()),
1644 );
1645 let filter_inner = eth_filter.inner;
1646
1647 let headers = vec![test_header.clone()];
1648
1649 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1650
1651 let result = cached_mode.next().await.expect("next should succeed");
1653 let receipt_block_result = result.expect("should have receipt result");
1654 assert_eq!(receipt_block_result.header.hash(), test_hash);
1655 assert_eq!(receipt_block_result.header.number, test_block_number);
1656 assert_eq!(receipt_block_result.receipts.len(), 1);
1657 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1658 assert_eq!(
1659 receipt_block_result.receipts[0].cumulative_gas_used,
1660 mock_receipt.cumulative_gas_used
1661 );
1662 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1663
1664 let result2 = cached_mode.next().await;
1666 assert!(result2.is_ok());
1667 assert!(result2.unwrap().is_none());
1668 }
1669
1670 #[tokio::test]
1671 async fn test_cached_mode_empty_headers() {
1672 let provider = MockEthProvider::default();
1673 let eth_api = build_test_eth_api(provider);
1674
1675 let eth_filter = super::EthFilter::new(
1676 eth_api,
1677 EthFilterConfig::default(),
1678 Box::new(TokioTaskExecutor::default()),
1679 );
1680 let filter_inner = eth_filter.inner;
1681
1682 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1683
1684 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1685
1686 let result = cached_mode.next().await.expect("next should succeed");
1688 assert!(result.is_none());
1689 }
1690
1691 #[tokio::test]
1692 async fn test_non_consecutive_headers_after_bloom_filter() {
1693 let provider = MockEthProvider::default();
1694
1695 let mut expected_hashes = vec![];
1697 let mut prev_hash = alloy_primitives::B256::default();
1698
1699 use alloy_consensus::TxLegacy;
1701 use reth_ethereum_primitives::{TransactionSigned, TxType};
1702
1703 let tx_inner = TxLegacy {
1704 chain_id: Some(1),
1705 nonce: 0,
1706 gas_price: 21_000,
1707 gas_limit: 21_000,
1708 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1709 value: alloy_primitives::U256::ZERO,
1710 input: alloy_primitives::Bytes::new(),
1711 };
1712 let signature = alloy_primitives::Signature::test_signature();
1713 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1714
1715 for i in 100u64..=103 {
1716 let header = alloy_consensus::Header {
1717 number: i,
1718 parent_hash: prev_hash,
1719 logs_bloom: if i == 100 || i == 102 {
1721 alloy_primitives::Bloom::from([1u8; 256])
1722 } else {
1723 alloy_primitives::Bloom::default()
1724 },
1725 ..Default::default()
1726 };
1727
1728 let hash = header.hash_slow();
1729 expected_hashes.push(hash);
1730 prev_hash = hash;
1731
1732 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1734
1735 let block = reth_ethereum_primitives::Block {
1736 header,
1737 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1738 };
1739 provider.add_block(hash, block);
1740 }
1741
1742 let mock_log = alloy_primitives::Log {
1744 address: alloy_primitives::Address::ZERO,
1745 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1746 };
1747
1748 let receipt = reth_ethereum_primitives::Receipt {
1749 tx_type: TxType::Legacy,
1750 cumulative_gas_used: 21_000,
1751 logs: vec![mock_log],
1752 success: true,
1753 };
1754
1755 provider.add_receipts(100, vec![receipt.clone()]);
1756 provider.add_receipts(101, vec![]);
1757 provider.add_receipts(102, vec![receipt.clone()]);
1758 provider.add_receipts(103, vec![]);
1759
1760 use reth_db_api::models::StoredBlockBodyIndices;
1762 provider
1763 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1764 provider
1765 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1766 provider
1767 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1768 provider
1769 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1770
1771 let eth_api = build_test_eth_api(provider);
1772 let eth_filter = EthFilter::new(
1773 eth_api,
1774 EthFilterConfig::default(),
1775 Box::new(TokioTaskExecutor::default()),
1776 );
1777
1778 let filter = Filter::default();
1780
1781 let logs = eth_filter
1783 .inner
1784 .clone()
1785 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1786 .await
1787 .expect("should succeed");
1788
1789 assert_eq!(logs.len(), 2);
1791
1792 assert_eq!(logs[0].block_number, Some(100));
1793 assert_eq!(logs[1].block_number, Some(102));
1794
1795 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1799}