1use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8 PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12 future::TryFutureExt,
13 stream::{FuturesOrdered, StreamExt},
14 Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21 helpers::{EthBlocks, LoadReceipt},
22 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23 RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32 ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37 collections::{HashMap, VecDeque},
38 fmt,
39 iter::{Peekable, StepBy},
40 ops::RangeInclusive,
41 pin::Pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 sync::{mpsc::Receiver, oneshot, Mutex},
47 time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53 Eth: FullEthApiTypes
54 + RpcNodeCoreExt<Provider: BlockIdReader>
55 + LoadReceipt
56 + EthBlocks
57 + 'static,
58{
59 fn logs(
61 &self,
62 filter: Filter,
63 limits: QueryLimits,
64 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65 trace!(target: "rpc::eth", "Serving eth_getLogs");
66 self.logs_for_filter(filter, limits).map_err(|e| e.into())
67 }
68}
69
70const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91pub struct EthFilter<Eth: EthApiTypes> {
95 inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101 Eth: EthApiTypes,
102{
103 fn clone(&self) -> Self {
104 Self { inner: self.inner.clone() }
105 }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110 Eth: EthApiTypes + 'static,
111{
112 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
140 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141 config;
142 let inner = EthFilterInner {
143 eth_api,
144 active_filters: ActiveFilters::new(),
145 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146 max_headers_range: MAX_HEADERS_RANGE,
147 task_spawner,
148 stale_filter_ttl,
149 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150 };
151
152 let eth_filter = Self { inner: Arc::new(inner) };
153
154 let this = eth_filter.clone();
155 eth_filter.inner.task_spawner.spawn_critical(
156 "eth-filters_stale-filters-clean",
157 Box::pin(async move {
158 this.watch_and_clear_stale_filters().await;
159 }),
160 );
161
162 eth_filter
163 }
164
165 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167 &self.inner.active_filters
168 }
169
170 async fn watch_and_clear_stale_filters(&self) {
173 let mut interval = tokio::time::interval_at(
174 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175 self.inner.stale_filter_ttl,
176 );
177 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178 loop {
179 interval.tick().await;
180 self.clear_stale_filters(Instant::now()).await;
181 }
182 }
183
184 pub async fn clear_stale_filters(&self, now: Instant) {
187 trace!(target: "rpc::eth", "clear stale filters");
188 self.active_filters().inner.lock().await.retain(|id, filter| {
189 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
190
191 if !is_valid {
192 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
193 }
194
195 is_valid
196 })
197 }
198}
199
200impl<Eth> EthFilter<Eth>
201where
202 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
203 + RpcNodeCoreExt
204 + LoadReceipt
205 + EthBlocks
206 + 'static,
207{
208 fn provider(&self) -> &Eth::Provider {
210 self.inner.eth_api.provider()
211 }
212
213 fn pool(&self) -> &Eth::Pool {
215 self.inner.eth_api.pool()
216 }
217
218 pub async fn filter_changes(
220 &self,
221 id: FilterId,
222 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
223 let info = self.provider().chain_info()?;
224 let best_number = info.best_number;
225
226 let (start_block, kind) = {
229 let mut filters = self.inner.active_filters.inner.lock().await;
230 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
231
232 if filter.block > best_number {
233 return Ok(FilterChanges::Empty)
235 }
236
237 let mut block = best_number + 1;
241 std::mem::swap(&mut filter.block, &mut block);
242 filter.last_poll_timestamp = Instant::now();
243
244 (block, filter.kind.clone())
245 };
246
247 match kind {
248 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
249 FilterKind::Block => {
250 let end_block = best_number + 1;
253 let block_hashes =
254 self.provider().canonical_hashes_range(start_block, end_block).map_err(
255 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
256 )?;
257 Ok(FilterChanges::Hashes(block_hashes))
258 }
259 FilterKind::Log(filter) => {
260 let (from_block_number, to_block_number) = match filter.block_option {
261 FilterBlockOption::Range { from_block, to_block } => {
262 let from = from_block
263 .map(|num| self.provider().convert_block_number(num))
264 .transpose()?
265 .flatten();
266 let to = to_block
267 .map(|num| self.provider().convert_block_number(num))
268 .transpose()?
269 .flatten();
270 logs_utils::get_filter_block_range(from, to, start_block, info)
271 }
272 FilterBlockOption::AtBlockHash(_) => {
273 (start_block, best_number)
277 }
278 };
279 let logs = self
280 .inner
281 .clone()
282 .get_logs_in_block_range(
283 *filter,
284 from_block_number,
285 to_block_number,
286 self.inner.query_limits,
287 )
288 .await?;
289 Ok(FilterChanges::Logs(logs))
290 }
291 }
292 }
293
294 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
300 let filter = {
301 let mut filters = self.inner.active_filters.inner.lock().await;
302 let filter =
303 filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
304 if let FilterKind::Log(ref inner_filter) = filter.kind {
305 filter.last_poll_timestamp = Instant::now();
306 *inner_filter.clone()
307 } else {
308 return Err(EthFilterError::FilterNotFound(id))
310 }
311 };
312
313 self.logs_for_filter(filter, self.inner.query_limits).await
314 }
315
316 async fn logs_for_filter(
318 &self,
319 filter: Filter,
320 limits: QueryLimits,
321 ) -> Result<Vec<Log>, EthFilterError> {
322 self.inner.clone().logs_for_filter(filter, limits).await
323 }
324}
325
326#[async_trait]
327impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
328where
329 Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
330{
331 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
333 trace!(target: "rpc::eth", "Serving eth_newFilter");
334 self.inner
335 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
336 .await
337 }
338
339 async fn new_block_filter(&self) -> RpcResult<FilterId> {
341 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
342 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
343 }
344
345 async fn new_pending_transaction_filter(
347 &self,
348 kind: Option<PendingTransactionFilterKind>,
349 ) -> RpcResult<FilterId> {
350 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
351
352 let transaction_kind = match kind.unwrap_or_default() {
353 PendingTransactionFilterKind::Hashes => {
354 let receiver = self.pool().pending_transactions_listener();
355 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
356 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
357 }
358 PendingTransactionFilterKind::Full => {
359 let stream = self.pool().new_pending_pool_transactions_listener();
360 let full_txs_receiver = FullTransactionsReceiver::new(
361 stream,
362 dyn_clone::clone(self.inner.eth_api.tx_resp_builder()),
363 );
364 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
365 full_txs_receiver,
366 )))
367 }
368 };
369
370 self.inner.install_filter(transaction_kind).await
372 }
373
374 async fn filter_changes(
376 &self,
377 id: FilterId,
378 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
379 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
380 Ok(Self::filter_changes(self, id).await?)
381 }
382
383 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
389 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
390 Ok(Self::filter_logs(self, id).await?)
391 }
392
393 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
395 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
396 let mut filters = self.inner.active_filters.inner.lock().await;
397 if filters.remove(&id).is_some() {
398 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
399 Ok(true)
400 } else {
401 Ok(false)
402 }
403 }
404
405 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
409 trace!(target: "rpc::eth", "Serving eth_getLogs");
410 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
411 }
412}
413
414impl<Eth> std::fmt::Debug for EthFilter<Eth>
415where
416 Eth: EthApiTypes,
417{
418 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
419 f.debug_struct("EthFilter").finish_non_exhaustive()
420 }
421}
422
423#[derive(Debug)]
425struct EthFilterInner<Eth: EthApiTypes> {
426 eth_api: Eth,
428 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
430 id_provider: Arc<dyn IdProvider>,
432 query_limits: QueryLimits,
434 max_headers_range: u64,
436 task_spawner: Box<dyn TaskSpawner>,
438 stale_filter_ttl: Duration,
440}
441
442impl<Eth> EthFilterInner<Eth>
443where
444 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
445 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
446 + LoadReceipt
447 + EthBlocks
448 + 'static,
449{
450 fn provider(&self) -> &Eth::Provider {
452 self.eth_api.provider()
453 }
454
455 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
457 self.eth_api.cache()
458 }
459
460 async fn logs_for_filter(
462 self: Arc<Self>,
463 filter: Filter,
464 limits: QueryLimits,
465 ) -> Result<Vec<Log>, EthFilterError> {
466 match filter.block_option {
467 FilterBlockOption::AtBlockHash(block_hash) => {
468 let header = self
471 .provider()
472 .header_by_hash_or_number(block_hash.into())?
473 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
474
475 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
476
477 let (receipts, maybe_block) = self
480 .eth_cache()
481 .get_receipts_and_maybe_block(block_num_hash.hash)
482 .await?
483 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
484
485 let mut all_logs = Vec::new();
486 append_matching_block_logs(
487 &mut all_logs,
488 maybe_block
489 .map(ProviderOrBlock::Block)
490 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
491 &filter,
492 block_num_hash,
493 &receipts,
494 false,
495 header.timestamp(),
496 )?;
497
498 Ok(all_logs)
499 }
500 FilterBlockOption::Range { from_block, to_block } => {
501 if from_block.is_some_and(|b| b.is_pending()) {
503 let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
504 if !(to_block.is_pending() || to_block.is_number()) {
505 return Ok(Vec::new());
507 }
508 if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
510 if let BlockNumberOrTag::Number(to_block) = to_block &&
511 to_block < pending_block.block.number()
512 {
513 return Ok(Vec::new());
515 }
516
517 let info = self.provider().chain_info()?;
518 if pending_block.block.number() > info.best_number {
519 let mut all_logs = Vec::new();
521 let timestamp = pending_block.block.timestamp();
522 let block_num_hash = pending_block.block.num_hash();
523 append_matching_block_logs(
524 &mut all_logs,
525 ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
526 &filter,
527 block_num_hash,
528 &pending_block.receipts,
529 false, timestamp,
531 )?;
532 return Ok(all_logs);
533 }
534 }
535 }
536
537 let info = self.provider().chain_info()?;
538 let start_block = info.best_number;
539 let from = from_block
540 .map(|num| self.provider().convert_block_number(num))
541 .transpose()?
542 .flatten();
543 let to = to_block
544 .map(|num| self.provider().convert_block_number(num))
545 .transpose()?
546 .flatten();
547
548 if let Some(f) = from &&
549 f > info.best_number
550 {
551 return Ok(Vec::new());
553 }
554
555 let (from_block_number, to_block_number) =
556 logs_utils::get_filter_block_range(from, to, start_block, info);
557
558 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
559 .await
560 }
561 }
562 }
563
564 async fn install_filter(
566 &self,
567 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
568 ) -> RpcResult<FilterId> {
569 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
570 let subscription_id = self.id_provider.next_id();
571
572 let id = match subscription_id {
573 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
574 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
575 };
576 let mut filters = self.active_filters.inner.lock().await;
577 filters.insert(
578 id.clone(),
579 ActiveFilter {
580 block: last_poll_block_number,
581 last_poll_timestamp: Instant::now(),
582 kind,
583 },
584 );
585 Ok(id)
586 }
587
588 async fn get_logs_in_block_range(
594 self: Arc<Self>,
595 filter: Filter,
596 from_block: u64,
597 to_block: u64,
598 limits: QueryLimits,
599 ) -> Result<Vec<Log>, EthFilterError> {
600 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
601
602 if to_block < from_block {
604 return Err(EthFilterError::InvalidBlockRangeParams)
605 }
606
607 if let Some(max_blocks_per_filter) =
608 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
609 {
610 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
611 }
612
613 let (tx, rx) = oneshot::channel();
614 let this = self.clone();
615 self.task_spawner.spawn_blocking(Box::pin(async move {
616 let res =
617 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
618 let _ = tx.send(res);
619 }));
620
621 rx.await.map_err(|_| EthFilterError::InternalError)?
622 }
623
624 async fn get_logs_in_block_range_inner(
633 self: Arc<Self>,
634 filter: &Filter,
635 from_block: u64,
636 to_block: u64,
637 limits: QueryLimits,
638 ) -> Result<Vec<Log>, EthFilterError> {
639 let mut all_logs = Vec::new();
640 let mut matching_headers = Vec::new();
641
642 let chain_tip = self.provider().best_block_number()?;
644
645 for (from, to) in
647 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
648 {
649 let headers = self.provider().headers_range(from..=to)?;
650
651 let mut headers_iter = headers.into_iter().peekable();
652
653 while let Some(header) = headers_iter.next() {
654 if !filter.matches_bloom(header.logs_bloom()) {
655 continue
656 }
657
658 let current_number = header.number();
659
660 let block_hash = match headers_iter.peek() {
661 Some(next_header) if next_header.number() == current_number + 1 => {
662 next_header.parent_hash()
664 }
665 _ => {
666 header.hash_slow()
668 }
669 };
670
671 matching_headers.push(SealedHeader::new(header, block_hash));
672 }
673 }
674
675 let mut range_mode = RangeMode::new(
677 self.clone(),
678 matching_headers,
679 from_block,
680 to_block,
681 self.max_headers_range,
682 chain_tip,
683 );
684
685 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
687 range_mode.next().await?
688 {
689 let num_hash = header.num_hash();
690 append_matching_block_logs(
691 &mut all_logs,
692 recovered_block
693 .map(ProviderOrBlock::Block)
694 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
695 filter,
696 num_hash,
697 &receipts,
698 false,
699 header.timestamp(),
700 )?;
701
702 let is_multi_block_range = from_block != to_block;
705 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
706 is_multi_block_range &&
707 all_logs.len() > max_logs_per_response
708 {
709 debug!(
710 target: "rpc::eth::filter",
711 logs_found = all_logs.len(),
712 max_logs_per_response,
713 from_block,
714 to_block = num_hash.number.saturating_sub(1),
715 "Query exceeded max logs per response limit"
716 );
717 return Err(EthFilterError::QueryExceedsMaxResults {
718 max_logs: max_logs_per_response,
719 from_block,
720 to_block: num_hash.number.saturating_sub(1),
721 });
722 }
723 }
724
725 Ok(all_logs)
726 }
727}
728
729#[derive(Debug, Clone, Default)]
731pub struct ActiveFilters<T> {
732 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
733}
734
735impl<T> ActiveFilters<T> {
736 pub fn new() -> Self {
738 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
739 }
740}
741
742#[derive(Debug)]
744struct ActiveFilter<T> {
745 block: u64,
747 last_poll_timestamp: Instant,
749 kind: FilterKind<T>,
751}
752
753#[derive(Debug, Clone)]
755struct PendingTransactionsReceiver {
756 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
757}
758
759impl PendingTransactionsReceiver {
760 fn new(receiver: Receiver<TxHash>) -> Self {
761 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
762 }
763
764 async fn drain<T>(&self) -> FilterChanges<T> {
766 let mut pending_txs = Vec::new();
767 let mut prepared_stream = self.txs_receiver.lock().await;
768
769 while let Ok(tx_hash) = prepared_stream.try_recv() {
770 pending_txs.push(tx_hash);
771 }
772
773 FilterChanges::Hashes(pending_txs)
775 }
776}
777
778#[derive(Debug, Clone)]
780struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
781 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
782 tx_resp_builder: TxCompat,
783}
784
785impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
786where
787 T: PoolTransaction + 'static,
788 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
789{
790 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
792 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
793 }
794
795 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
797 let mut pending_txs = Vec::new();
798 let mut prepared_stream = self.txs_stream.lock().await;
799
800 while let Ok(tx) = prepared_stream.try_recv() {
801 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
802 Ok(tx) => pending_txs.push(tx),
803 Err(err) => {
804 error!(target: "rpc",
805 %err,
806 "Failed to fill txn with block context"
807 );
808 }
809 }
810 }
811 FilterChanges::Transactions(pending_txs)
812 }
813}
814
815#[async_trait]
817trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
818 async fn drain(&self) -> FilterChanges<T>;
819}
820
821#[async_trait]
822impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
823 for FullTransactionsReceiver<T, TxCompat>
824where
825 T: PoolTransaction + 'static,
826 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
827{
828 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
829 Self::drain(self).await
830 }
831}
832
833#[derive(Debug, Clone)]
839enum PendingTransactionKind<T> {
840 Hashes(PendingTransactionsReceiver),
841 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
842}
843
844impl<T: 'static> PendingTransactionKind<T> {
845 async fn drain(&self) -> FilterChanges<T> {
846 match self {
847 Self::Hashes(receiver) => receiver.drain().await,
848 Self::FullTransaction(receiver) => receiver.drain().await,
849 }
850 }
851}
852
853#[derive(Clone, Debug)]
854enum FilterKind<T> {
855 Log(Box<Filter>),
856 Block,
857 PendingTransaction(PendingTransactionKind<T>),
858}
859
860#[derive(Debug)]
862struct BlockRangeInclusiveIter {
863 iter: StepBy<RangeInclusive<u64>>,
864 step: u64,
865 end: u64,
866}
867
868impl BlockRangeInclusiveIter {
869 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
870 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
871 }
872}
873
874impl Iterator for BlockRangeInclusiveIter {
875 type Item = (u64, u64);
876
877 fn next(&mut self) -> Option<Self::Item> {
878 let start = self.iter.next()?;
879 let end = (start + self.step).min(self.end);
880 if start > end {
881 return None
882 }
883 Some((start, end))
884 }
885}
886
887#[derive(Debug, thiserror::Error)]
889pub enum EthFilterError {
890 #[error("filter not found")]
892 FilterNotFound(FilterId),
893 #[error("invalid block range params")]
895 InvalidBlockRangeParams,
896 #[error("query exceeds max block range {0}")]
898 QueryExceedsMaxBlocks(u64),
899 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
901 QueryExceedsMaxResults {
902 max_logs: usize,
904 from_block: u64,
906 to_block: u64,
908 },
909 #[error(transparent)]
911 EthAPIError(#[from] EthApiError),
912 #[error("internal filter error")]
914 InternalError,
915}
916
917impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
918 fn from(err: EthFilterError) -> Self {
919 match err {
920 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
921 jsonrpsee::types::error::INVALID_PARAMS_CODE,
922 "filter not found",
923 ),
924 err @ EthFilterError::InternalError => {
925 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
926 }
927 EthFilterError::EthAPIError(err) => err.into(),
928 err @ (EthFilterError::InvalidBlockRangeParams |
929 EthFilterError::QueryExceedsMaxBlocks(_) |
930 EthFilterError::QueryExceedsMaxResults { .. }) => {
931 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
932 }
933 }
934 }
935}
936
937impl From<ProviderError> for EthFilterError {
938 fn from(err: ProviderError) -> Self {
939 Self::EthAPIError(err.into())
940 }
941}
942
943struct ReceiptBlockResult<P>
946where
947 P: ReceiptProvider + BlockReader,
948{
949 receipts: Arc<Vec<ProviderReceipt<P>>>,
951 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
953 header: SealedHeader<<P as HeaderProvider>::Header>,
955}
956
957enum RangeMode<
959 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
960 + EthApiTypes
961 + LoadReceipt
962 + EthBlocks
963 + 'static,
964> {
965 Cached(CachedMode<Eth>),
967 Range(RangeBlockMode<Eth>),
969}
970
971impl<
972 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
973 + EthApiTypes
974 + LoadReceipt
975 + EthBlocks
976 + 'static,
977 > RangeMode<Eth>
978{
979 fn new(
981 filter_inner: Arc<EthFilterInner<Eth>>,
982 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
983 from_block: u64,
984 to_block: u64,
985 max_headers_range: u64,
986 chain_tip: u64,
987 ) -> Self {
988 let block_count = to_block - from_block + 1;
989 let distance_from_tip = chain_tip.saturating_sub(to_block);
990
991 let use_cached_mode =
993 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
994
995 if use_cached_mode && !sealed_headers.is_empty() {
996 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
997 } else {
998 Self::Range(RangeBlockMode {
999 filter_inner,
1000 iter: sealed_headers.into_iter().peekable(),
1001 next: VecDeque::new(),
1002 max_range: max_headers_range as usize,
1003 pending_tasks: FuturesOrdered::new(),
1004 })
1005 }
1006 }
1007
1008 const fn should_use_cached_mode(
1010 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1011 block_count: u64,
1012 distance_from_tip: u64,
1013 ) -> bool {
1014 let bloom_matches = headers.len();
1016
1017 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1019
1020 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1021 }
1022
1023 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1025 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1027 return CACHED_MODE_BLOCK_THRESHOLD;
1028 }
1029
1030 match bloom_matches {
1031 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1032 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1033 _ => CACHED_MODE_BLOCK_THRESHOLD,
1034 }
1035 }
1036
1037 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1039 match self {
1040 Self::Cached(cached) => cached.next().await,
1041 Self::Range(range) => range.next().await,
1042 }
1043 }
1044}
1045
1046struct CachedMode<
1048 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1049 + EthApiTypes
1050 + LoadReceipt
1051 + EthBlocks
1052 + 'static,
1053> {
1054 filter_inner: Arc<EthFilterInner<Eth>>,
1055 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1056}
1057
1058impl<
1059 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1060 + EthApiTypes
1061 + LoadReceipt
1062 + EthBlocks
1063 + 'static,
1064 > CachedMode<Eth>
1065{
1066 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1067 for header in self.headers_iter.by_ref() {
1068 if let Some((receipts, maybe_block)) =
1070 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1071 {
1072 return Ok(Some(ReceiptBlockResult {
1073 receipts,
1074 recovered_block: maybe_block,
1075 header,
1076 }));
1077 }
1078 }
1079
1080 Ok(None) }
1082}
1083
1084type ReceiptFetchFuture<P> =
1086 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1087
1088struct RangeBlockMode<
1090 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1091 + EthApiTypes
1092 + LoadReceipt
1093 + EthBlocks
1094 + 'static,
1095> {
1096 filter_inner: Arc<EthFilterInner<Eth>>,
1097 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1098 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1099 max_range: usize,
1100 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1102}
1103
1104impl<
1105 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1106 + EthApiTypes
1107 + LoadReceipt
1108 + EthBlocks
1109 + 'static,
1110 > RangeBlockMode<Eth>
1111{
1112 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1113 loop {
1114 if let Some(result) = self.next.pop_front() {
1116 return Ok(Some(result));
1117 }
1118
1119 if let Some(task_result) = self.pending_tasks.next().await {
1121 self.next.extend(task_result?);
1122 continue;
1123 }
1124
1125 let Some(next_header) = self.iter.next() else {
1127 return Ok(None);
1129 };
1130
1131 let mut range_headers = Vec::with_capacity(self.max_range);
1132 range_headers.push(next_header);
1133
1134 while range_headers.len() < self.max_range {
1136 let Some(peeked) = self.iter.peek() else { break };
1137 let Some(last_header) = range_headers.last() else { break };
1138
1139 let expected_next = last_header.number() + 1;
1140 if peeked.number() != expected_next {
1141 trace!(
1142 target: "rpc::eth::filter",
1143 last_block = last_header.number(),
1144 next_block = peeked.number(),
1145 expected = expected_next,
1146 range_size = range_headers.len(),
1147 "Non-consecutive block detected, stopping range collection"
1148 );
1149 break; }
1151
1152 let Some(next_header) = self.iter.next() else { break };
1153 range_headers.push(next_header);
1154 }
1155
1156 let remaining_headers = self.iter.len() + range_headers.len();
1158 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1159 self.spawn_parallel_tasks(range_headers);
1160 } else {
1162 if let Some(result) = self.process_small_range(range_headers).await? {
1164 return Ok(Some(result));
1165 }
1166 }
1168 }
1169 }
1170
1171 async fn process_small_range(
1175 &mut self,
1176 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1177 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1178 for header in range_headers {
1180 let (maybe_block, maybe_receipts) = self
1182 .filter_inner
1183 .eth_cache()
1184 .maybe_cached_block_and_receipts(header.hash())
1185 .await?;
1186
1187 let receipts = match maybe_receipts {
1188 Some(receipts) => receipts,
1189 None => {
1190 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1192 Some(receipts) => Arc::new(receipts),
1193 None => continue, }
1195 }
1196 };
1197
1198 if !receipts.is_empty() {
1199 self.next.push_back(ReceiptBlockResult {
1200 receipts,
1201 recovered_block: maybe_block,
1202 header,
1203 });
1204 }
1205 }
1206
1207 Ok(self.next.pop_front())
1208 }
1209
1210 fn spawn_parallel_tasks(
1215 &mut self,
1216 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1217 ) {
1218 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1220 let header_chunks = range_headers
1221 .into_iter()
1222 .chunks(chunk_size)
1223 .into_iter()
1224 .map(|chunk| chunk.collect::<Vec<_>>())
1225 .collect::<Vec<_>>();
1226
1227 for chunk_headers in header_chunks {
1229 let filter_inner = self.filter_inner.clone();
1230 let chunk_task = Box::pin(async move {
1231 let chunk_task = tokio::task::spawn_blocking(move || {
1232 let mut chunk_results = Vec::new();
1233
1234 for header in chunk_headers {
1235 let receipts = match filter_inner
1238 .provider()
1239 .receipts_by_block(header.hash().into())?
1240 {
1241 Some(receipts) => Arc::new(receipts),
1242 None => continue, };
1244
1245 if !receipts.is_empty() {
1246 chunk_results.push(ReceiptBlockResult {
1247 receipts,
1248 recovered_block: None,
1249 header,
1250 });
1251 }
1252 }
1253
1254 Ok(chunk_results)
1255 });
1256
1257 match chunk_task.await {
1259 Ok(Ok(chunk_results)) => Ok(chunk_results),
1260 Ok(Err(e)) => Err(e),
1261 Err(join_err) => {
1262 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1263 Err(EthFilterError::InternalError)
1264 }
1265 }
1266 });
1267
1268 self.pending_tasks.push_back(chunk_task);
1269 }
1270 }
1271}
1272
1273#[cfg(test)]
1274mod tests {
1275 use super::*;
1276 use crate::{eth::EthApi, EthApiBuilder};
1277 use alloy_network::Ethereum;
1278 use alloy_primitives::FixedBytes;
1279 use rand::Rng;
1280 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1281 use reth_ethereum_primitives::TxType;
1282 use reth_evm_ethereum::EthEvmConfig;
1283 use reth_network_api::noop::NoopNetwork;
1284 use reth_provider::test_utils::MockEthProvider;
1285 use reth_rpc_convert::RpcConverter;
1286 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1287 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1288 use reth_tasks::TokioTaskExecutor;
1289 use reth_testing_utils::generators;
1290 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1291 use std::{collections::VecDeque, sync::Arc};
1292
1293 #[test]
1294 fn test_block_range_iter() {
1295 let mut rng = generators::rng();
1296
1297 let start = rng.random::<u32>() as u64;
1298 let end = start.saturating_add(rng.random::<u32>() as u64);
1299 let step = rng.random::<u16>() as u64;
1300 let range = start..=end;
1301 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1302 let (from, mut end) = iter.next().unwrap();
1303 assert_eq!(from, start);
1304 assert_eq!(end, (from + step).min(*range.end()));
1305
1306 for (next_from, next_end) in iter {
1307 assert_eq!(next_from, end + 1);
1309 end = next_end;
1310 }
1311
1312 assert_eq!(end, *range.end());
1313 }
1314
1315 #[expect(clippy::type_complexity)]
1317 fn build_test_eth_api(
1318 provider: MockEthProvider,
1319 ) -> EthApi<
1320 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1321 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1322 > {
1323 EthApiBuilder::new(
1324 provider.clone(),
1325 testing_pool(),
1326 NoopNetwork::default(),
1327 EthEvmConfig::new(provider.chain_spec()),
1328 )
1329 .build()
1330 }
1331
1332 #[tokio::test]
1333 async fn test_range_block_mode_empty_range() {
1334 let provider = MockEthProvider::default();
1335 let eth_api = build_test_eth_api(provider);
1336
1337 let eth_filter = super::EthFilter::new(
1338 eth_api,
1339 EthFilterConfig::default(),
1340 Box::new(TokioTaskExecutor::default()),
1341 );
1342 let filter_inner = eth_filter.inner;
1343
1344 let headers = vec![];
1345 let max_range = 100;
1346
1347 let mut range_mode = RangeBlockMode {
1348 filter_inner,
1349 iter: headers.into_iter().peekable(),
1350 next: VecDeque::new(),
1351 max_range,
1352 pending_tasks: FuturesOrdered::new(),
1353 };
1354
1355 let result = range_mode.next().await;
1356 assert!(result.is_ok());
1357 assert!(result.unwrap().is_none());
1358 }
1359
1360 #[tokio::test]
1361 async fn test_range_block_mode_queued_results_priority() {
1362 let provider = MockEthProvider::default();
1363 let eth_api = build_test_eth_api(provider);
1364
1365 let eth_filter = super::EthFilter::new(
1366 eth_api,
1367 EthFilterConfig::default(),
1368 Box::new(TokioTaskExecutor::default()),
1369 );
1370 let filter_inner = eth_filter.inner;
1371
1372 let headers = vec![
1373 SealedHeader::new(
1374 alloy_consensus::Header { number: 100, ..Default::default() },
1375 FixedBytes::random(),
1376 ),
1377 SealedHeader::new(
1378 alloy_consensus::Header { number: 101, ..Default::default() },
1379 FixedBytes::random(),
1380 ),
1381 ];
1382
1383 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1385 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1386
1387 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1389 tx_type: TxType::Legacy,
1390 cumulative_gas_used: 100_000,
1391 logs: vec![],
1392 success: true,
1393 };
1394 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1395 tx_type: TxType::Eip1559,
1396 cumulative_gas_used: 200_000,
1397 logs: vec![],
1398 success: true,
1399 };
1400 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1401 tx_type: TxType::Eip2930,
1402 cumulative_gas_used: 150_000,
1403 logs: vec![],
1404 success: false, };
1406
1407 let mock_result_1 = ReceiptBlockResult {
1408 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1409 recovered_block: None,
1410 header: SealedHeader::new(
1411 alloy_consensus::Header { number: 42, ..Default::default() },
1412 expected_block_hash_1,
1413 ),
1414 };
1415
1416 let mock_result_2 = ReceiptBlockResult {
1417 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1418 recovered_block: None,
1419 header: SealedHeader::new(
1420 alloy_consensus::Header { number: 43, ..Default::default() },
1421 expected_block_hash_2,
1422 ),
1423 };
1424
1425 let mut range_mode = RangeBlockMode {
1426 filter_inner,
1427 iter: headers.into_iter().peekable(),
1428 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1430 pending_tasks: FuturesOrdered::new(),
1431 };
1432
1433 let result1 = range_mode.next().await;
1435 assert!(result1.is_ok());
1436 let receipt_result1 = result1.unwrap().unwrap();
1437 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1438 assert_eq!(receipt_result1.header.number, 42);
1439
1440 assert_eq!(receipt_result1.receipts.len(), 2);
1442 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1443 assert_eq!(
1444 receipt_result1.receipts[0].cumulative_gas_used,
1445 mock_receipt_1.cumulative_gas_used
1446 );
1447 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1448 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1449 assert_eq!(
1450 receipt_result1.receipts[1].cumulative_gas_used,
1451 mock_receipt_2.cumulative_gas_used
1452 );
1453 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1454
1455 let result2 = range_mode.next().await;
1457 assert!(result2.is_ok());
1458 let receipt_result2 = result2.unwrap().unwrap();
1459 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1460 assert_eq!(receipt_result2.header.number, 43);
1461
1462 assert_eq!(receipt_result2.receipts.len(), 1);
1464 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1465 assert_eq!(
1466 receipt_result2.receipts[0].cumulative_gas_used,
1467 mock_receipt_3.cumulative_gas_used
1468 );
1469 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1470
1471 assert!(range_mode.next.is_empty());
1473
1474 let result3 = range_mode.next().await;
1475 assert!(result3.is_ok());
1476 }
1477
1478 #[tokio::test]
1479 async fn test_range_block_mode_single_block_no_receipts() {
1480 let provider = MockEthProvider::default();
1481 let eth_api = build_test_eth_api(provider);
1482
1483 let eth_filter = super::EthFilter::new(
1484 eth_api,
1485 EthFilterConfig::default(),
1486 Box::new(TokioTaskExecutor::default()),
1487 );
1488 let filter_inner = eth_filter.inner;
1489
1490 let headers = vec![SealedHeader::new(
1491 alloy_consensus::Header { number: 100, ..Default::default() },
1492 FixedBytes::random(),
1493 )];
1494
1495 let mut range_mode = RangeBlockMode {
1496 filter_inner,
1497 iter: headers.into_iter().peekable(),
1498 next: VecDeque::new(),
1499 max_range: 100,
1500 pending_tasks: FuturesOrdered::new(),
1501 };
1502
1503 let result = range_mode.next().await;
1504 assert!(result.is_ok());
1505 }
1506
1507 #[tokio::test]
1508 async fn test_range_block_mode_provider_receipts() {
1509 let provider = MockEthProvider::default();
1510
1511 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1512 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1513 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1514
1515 let block_hash_1 = FixedBytes::random();
1516 let block_hash_2 = FixedBytes::random();
1517 let block_hash_3 = FixedBytes::random();
1518
1519 provider.add_header(block_hash_1, header_1.clone());
1520 provider.add_header(block_hash_2, header_2.clone());
1521 provider.add_header(block_hash_3, header_3.clone());
1522
1523 let mock_log = alloy_primitives::Log {
1525 address: alloy_primitives::Address::ZERO,
1526 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1527 };
1528
1529 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1530 tx_type: TxType::Legacy,
1531 cumulative_gas_used: 21_000,
1532 logs: vec![mock_log.clone()],
1533 success: true,
1534 };
1535 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1536 tx_type: TxType::Eip1559,
1537 cumulative_gas_used: 42_000,
1538 logs: vec![mock_log.clone()],
1539 success: true,
1540 };
1541 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1542 tx_type: TxType::Eip2930,
1543 cumulative_gas_used: 30_000,
1544 logs: vec![mock_log.clone()],
1545 success: false,
1546 };
1547
1548 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1549 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1550
1551 let eth_api = build_test_eth_api(provider);
1552
1553 let eth_filter = super::EthFilter::new(
1554 eth_api,
1555 EthFilterConfig::default(),
1556 Box::new(TokioTaskExecutor::default()),
1557 );
1558 let filter_inner = eth_filter.inner;
1559
1560 let headers = vec![
1561 SealedHeader::new(header_1, block_hash_1),
1562 SealedHeader::new(header_2, block_hash_2),
1563 SealedHeader::new(header_3, block_hash_3),
1564 ];
1565
1566 let mut range_mode = RangeBlockMode {
1567 filter_inner,
1568 iter: headers.into_iter().peekable(),
1569 next: VecDeque::new(),
1570 max_range: 3, pending_tasks: FuturesOrdered::new(),
1572 };
1573
1574 let result = range_mode.next().await;
1576 assert!(result.is_ok());
1577 let receipt_result = result.unwrap().unwrap();
1578
1579 assert_eq!(receipt_result.header.hash(), block_hash_1);
1580 assert_eq!(receipt_result.header.number, 100);
1581 assert_eq!(receipt_result.receipts.len(), 2);
1582
1583 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1585 assert_eq!(
1586 receipt_result.receipts[0].cumulative_gas_used,
1587 receipt_100_1.cumulative_gas_used
1588 );
1589 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1590
1591 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1592 assert_eq!(
1593 receipt_result.receipts[1].cumulative_gas_used,
1594 receipt_100_2.cumulative_gas_used
1595 );
1596 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1597
1598 let result2 = range_mode.next().await;
1600 assert!(result2.is_ok());
1601 let receipt_result2 = result2.unwrap().unwrap();
1602
1603 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1604 assert_eq!(receipt_result2.header.number, 101);
1605 assert_eq!(receipt_result2.receipts.len(), 1);
1606
1607 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1609 assert_eq!(
1610 receipt_result2.receipts[0].cumulative_gas_used,
1611 receipt_101_1.cumulative_gas_used
1612 );
1613 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1614
1615 let result3 = range_mode.next().await;
1617 assert!(result3.is_ok());
1618 assert!(result3.unwrap().is_none());
1619 }
1620
1621 #[tokio::test]
1622 async fn test_range_block_mode_iterator_exhaustion() {
1623 let provider = MockEthProvider::default();
1624
1625 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1626 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1627
1628 let block_hash_100 = FixedBytes::random();
1629 let block_hash_101 = FixedBytes::random();
1630
1631 provider.add_header(block_hash_100, header_100.clone());
1633 provider.add_header(block_hash_101, header_101.clone());
1634
1635 let mock_receipt = reth_ethereum_primitives::Receipt {
1637 tx_type: TxType::Legacy,
1638 cumulative_gas_used: 21_000,
1639 logs: vec![],
1640 success: true,
1641 };
1642 provider.add_receipts(100, vec![mock_receipt.clone()]);
1643 provider.add_receipts(101, vec![mock_receipt.clone()]);
1644
1645 let eth_api = build_test_eth_api(provider);
1646
1647 let eth_filter = super::EthFilter::new(
1648 eth_api,
1649 EthFilterConfig::default(),
1650 Box::new(TokioTaskExecutor::default()),
1651 );
1652 let filter_inner = eth_filter.inner;
1653
1654 let headers = vec![
1655 SealedHeader::new(header_100, block_hash_100),
1656 SealedHeader::new(header_101, block_hash_101),
1657 ];
1658
1659 let mut range_mode = RangeBlockMode {
1660 filter_inner,
1661 iter: headers.into_iter().peekable(),
1662 next: VecDeque::new(),
1663 max_range: 1,
1664 pending_tasks: FuturesOrdered::new(),
1665 };
1666
1667 let result1 = range_mode.next().await;
1668 assert!(result1.is_ok());
1669 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1674 assert!(result2.is_ok());
1675 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1679
1680 let result3 = range_mode.next().await;
1682 assert!(result3.is_ok());
1683 assert!(result3.unwrap().is_none());
1684 }
1685
1686 #[tokio::test]
1687 async fn test_cached_mode_with_mock_receipts() {
1688 let test_hash = FixedBytes::from([42u8; 32]);
1690 let test_block_number = 100u64;
1691 let test_header = SealedHeader::new(
1692 alloy_consensus::Header {
1693 number: test_block_number,
1694 gas_used: 50_000,
1695 ..Default::default()
1696 },
1697 test_hash,
1698 );
1699
1700 let mock_log = alloy_primitives::Log {
1702 address: alloy_primitives::Address::ZERO,
1703 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1704 };
1705
1706 let mock_receipt = reth_ethereum_primitives::Receipt {
1707 tx_type: TxType::Legacy,
1708 cumulative_gas_used: 21_000,
1709 logs: vec![mock_log],
1710 success: true,
1711 };
1712
1713 let provider = MockEthProvider::default();
1714 provider.add_header(test_hash, test_header.header().clone());
1715 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1716
1717 let eth_api = build_test_eth_api(provider);
1718 let eth_filter = super::EthFilter::new(
1719 eth_api,
1720 EthFilterConfig::default(),
1721 Box::new(TokioTaskExecutor::default()),
1722 );
1723 let filter_inner = eth_filter.inner;
1724
1725 let headers = vec![test_header.clone()];
1726
1727 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1728
1729 let result = cached_mode.next().await.expect("next should succeed");
1731 let receipt_block_result = result.expect("should have receipt result");
1732 assert_eq!(receipt_block_result.header.hash(), test_hash);
1733 assert_eq!(receipt_block_result.header.number, test_block_number);
1734 assert_eq!(receipt_block_result.receipts.len(), 1);
1735 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1736 assert_eq!(
1737 receipt_block_result.receipts[0].cumulative_gas_used,
1738 mock_receipt.cumulative_gas_used
1739 );
1740 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1741
1742 let result2 = cached_mode.next().await;
1744 assert!(result2.is_ok());
1745 assert!(result2.unwrap().is_none());
1746 }
1747
1748 #[tokio::test]
1749 async fn test_cached_mode_empty_headers() {
1750 let provider = MockEthProvider::default();
1751 let eth_api = build_test_eth_api(provider);
1752
1753 let eth_filter = super::EthFilter::new(
1754 eth_api,
1755 EthFilterConfig::default(),
1756 Box::new(TokioTaskExecutor::default()),
1757 );
1758 let filter_inner = eth_filter.inner;
1759
1760 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1761
1762 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1763
1764 let result = cached_mode.next().await.expect("next should succeed");
1766 assert!(result.is_none());
1767 }
1768
1769 #[tokio::test]
1770 async fn test_non_consecutive_headers_after_bloom_filter() {
1771 let provider = MockEthProvider::default();
1772
1773 let mut expected_hashes = vec![];
1775 let mut prev_hash = alloy_primitives::B256::default();
1776
1777 use alloy_consensus::TxLegacy;
1779 use reth_ethereum_primitives::{TransactionSigned, TxType};
1780
1781 let tx_inner = TxLegacy {
1782 chain_id: Some(1),
1783 nonce: 0,
1784 gas_price: 21_000,
1785 gas_limit: 21_000,
1786 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1787 value: alloy_primitives::U256::ZERO,
1788 input: alloy_primitives::Bytes::new(),
1789 };
1790 let signature = alloy_primitives::Signature::test_signature();
1791 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1792
1793 for i in 100u64..=103 {
1794 let header = alloy_consensus::Header {
1795 number: i,
1796 parent_hash: prev_hash,
1797 logs_bloom: if i == 100 || i == 102 {
1799 alloy_primitives::Bloom::from([1u8; 256])
1800 } else {
1801 alloy_primitives::Bloom::default()
1802 },
1803 ..Default::default()
1804 };
1805
1806 let hash = header.hash_slow();
1807 expected_hashes.push(hash);
1808 prev_hash = hash;
1809
1810 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1812
1813 let block = reth_ethereum_primitives::Block {
1814 header,
1815 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1816 };
1817 provider.add_block(hash, block);
1818 }
1819
1820 let mock_log = alloy_primitives::Log {
1822 address: alloy_primitives::Address::ZERO,
1823 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1824 };
1825
1826 let receipt = reth_ethereum_primitives::Receipt {
1827 tx_type: TxType::Legacy,
1828 cumulative_gas_used: 21_000,
1829 logs: vec![mock_log],
1830 success: true,
1831 };
1832
1833 provider.add_receipts(100, vec![receipt.clone()]);
1834 provider.add_receipts(101, vec![]);
1835 provider.add_receipts(102, vec![receipt.clone()]);
1836 provider.add_receipts(103, vec![]);
1837
1838 use reth_db_api::models::StoredBlockBodyIndices;
1840 provider
1841 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1842 provider
1843 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1844 provider
1845 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1846 provider
1847 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1848
1849 let eth_api = build_test_eth_api(provider);
1850 let eth_filter = EthFilter::new(
1851 eth_api,
1852 EthFilterConfig::default(),
1853 Box::new(TokioTaskExecutor::default()),
1854 );
1855
1856 let filter = Filter::default();
1858
1859 let logs = eth_filter
1861 .inner
1862 .clone()
1863 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1864 .await
1865 .expect("should succeed");
1866
1867 assert_eq!(logs.len(), 2);
1869
1870 assert_eq!(logs[0].block_number, Some(100));
1871 assert_eq!(logs[1].block_number, Some(102));
1872
1873 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1877}