1use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8 PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12 future::TryFutureExt,
13 stream::{FuturesOrdered, StreamExt},
14 Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21 helpers::{EthBlocks, LoadReceipt},
22 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23 RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32 ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37 collections::{HashMap, VecDeque},
38 fmt,
39 iter::{Peekable, StepBy},
40 ops::RangeInclusive,
41 pin::Pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 sync::{mpsc::Receiver, oneshot, Mutex},
47 time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53 Eth: FullEthApiTypes
54 + RpcNodeCoreExt<Provider: BlockIdReader>
55 + LoadReceipt
56 + EthBlocks
57 + 'static,
58{
59 fn logs(
61 &self,
62 filter: Filter,
63 limits: QueryLimits,
64 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65 trace!(target: "rpc::eth", "Serving eth_getLogs");
66 self.logs_for_filter(filter, limits).map_err(|e| e.into())
67 }
68}
69
70const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91pub struct EthFilter<Eth: EthApiTypes> {
95 inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101 Eth: EthApiTypes,
102{
103 fn clone(&self) -> Self {
104 Self { inner: self.inner.clone() }
105 }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110 Eth: EthApiTypes + 'static,
111{
112 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
140 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141 config;
142 let inner = EthFilterInner {
143 eth_api,
144 active_filters: ActiveFilters::new(),
145 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146 max_headers_range: MAX_HEADERS_RANGE,
147 task_spawner,
148 stale_filter_ttl,
149 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150 };
151
152 let eth_filter = Self { inner: Arc::new(inner) };
153
154 let this = eth_filter.clone();
155 eth_filter.inner.task_spawner.spawn_critical(
156 "eth-filters_stale-filters-clean",
157 Box::pin(async move {
158 this.watch_and_clear_stale_filters().await;
159 }),
160 );
161
162 eth_filter
163 }
164
165 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167 &self.inner.active_filters
168 }
169
170 async fn watch_and_clear_stale_filters(&self) {
173 let mut interval = tokio::time::interval_at(
174 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175 self.inner.stale_filter_ttl,
176 );
177 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178 loop {
179 interval.tick().await;
180 self.clear_stale_filters(Instant::now()).await;
181 }
182 }
183
184 pub async fn clear_stale_filters(&self, now: Instant) {
187 trace!(target: "rpc::eth", "clear stale filters");
188 let mut filters = self.active_filters().inner.lock().await;
189 filters.retain(|id, filter| {
190 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192 if !is_valid {
193 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194 }
195
196 is_valid
197 });
198 filters.shrink_to_fit();
199 }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205 + RpcNodeCoreExt
206 + LoadReceipt
207 + EthBlocks
208 + 'static,
209{
210 fn provider(&self) -> &Eth::Provider {
212 self.inner.eth_api.provider()
213 }
214
215 fn pool(&self) -> &Eth::Pool {
217 self.inner.eth_api.pool()
218 }
219
220 pub async fn filter_changes(
222 &self,
223 id: FilterId,
224 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225 let info = self.provider().chain_info()?;
226 let best_number = info.best_number;
227
228 let (start_block, kind) = {
231 let mut filters = self.inner.active_filters.inner.lock().await;
232 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234 if filter.block > best_number {
235 return Ok(FilterChanges::Empty)
237 }
238
239 let mut block = best_number + 1;
243 std::mem::swap(&mut filter.block, &mut block);
244 filter.last_poll_timestamp = Instant::now();
245
246 (block, filter.kind.clone())
247 };
248
249 match kind {
250 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251 FilterKind::Block => {
252 let end_block = best_number + 1;
255 let block_hashes =
256 self.provider().canonical_hashes_range(start_block, end_block).map_err(
257 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258 )?;
259 Ok(FilterChanges::Hashes(block_hashes))
260 }
261 FilterKind::Log(filter) => {
262 let (from_block_number, to_block_number) = match filter.block_option {
263 FilterBlockOption::Range { from_block, to_block } => {
264 let from = from_block
265 .map(|num| self.provider().convert_block_number(num))
266 .transpose()?
267 .flatten();
268 let to = to_block
269 .map(|num| self.provider().convert_block_number(num))
270 .transpose()?
271 .flatten();
272 logs_utils::get_filter_block_range(from, to, start_block, info)?
273 }
274 FilterBlockOption::AtBlockHash(_) => {
275 (start_block, best_number)
279 }
280 };
281 let logs = self
282 .inner
283 .clone()
284 .get_logs_in_block_range(
285 *filter,
286 from_block_number,
287 to_block_number,
288 self.inner.query_limits,
289 )
290 .await?;
291 Ok(FilterChanges::Logs(logs))
292 }
293 }
294 }
295
296 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
302 let filter = {
303 let mut filters = self.inner.active_filters.inner.lock().await;
304 let filter =
305 filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
306 if let FilterKind::Log(ref inner_filter) = filter.kind {
307 filter.last_poll_timestamp = Instant::now();
308 *inner_filter.clone()
309 } else {
310 return Err(EthFilterError::FilterNotFound(id))
312 }
313 };
314
315 self.logs_for_filter(filter, self.inner.query_limits).await
316 }
317
318 async fn logs_for_filter(
320 &self,
321 filter: Filter,
322 limits: QueryLimits,
323 ) -> Result<Vec<Log>, EthFilterError> {
324 self.inner.clone().logs_for_filter(filter, limits).await
325 }
326}
327
328#[async_trait]
329impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
330where
331 Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
332{
333 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
335 trace!(target: "rpc::eth", "Serving eth_newFilter");
336 self.inner
337 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
338 .await
339 }
340
341 async fn new_block_filter(&self) -> RpcResult<FilterId> {
343 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
344 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
345 }
346
347 async fn new_pending_transaction_filter(
349 &self,
350 kind: Option<PendingTransactionFilterKind>,
351 ) -> RpcResult<FilterId> {
352 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
353
354 let transaction_kind = match kind.unwrap_or_default() {
355 PendingTransactionFilterKind::Hashes => {
356 let receiver = self.pool().pending_transactions_listener();
357 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
358 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
359 }
360 PendingTransactionFilterKind::Full => {
361 let stream = self.pool().new_pending_pool_transactions_listener();
362 let full_txs_receiver = FullTransactionsReceiver::new(
363 stream,
364 dyn_clone::clone(self.inner.eth_api.converter()),
365 );
366 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
367 full_txs_receiver,
368 )))
369 }
370 };
371
372 self.inner.install_filter(transaction_kind).await
374 }
375
376 async fn filter_changes(
378 &self,
379 id: FilterId,
380 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
381 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
382 Ok(Self::filter_changes(self, id).await?)
383 }
384
385 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
391 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
392 Ok(Self::filter_logs(self, id).await?)
393 }
394
395 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
397 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
398 let mut filters = self.inner.active_filters.inner.lock().await;
399 if filters.remove(&id).is_some() {
400 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
401 Ok(true)
402 } else {
403 Ok(false)
404 }
405 }
406
407 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
411 trace!(target: "rpc::eth", "Serving eth_getLogs");
412 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
413 }
414}
415
416impl<Eth> std::fmt::Debug for EthFilter<Eth>
417where
418 Eth: EthApiTypes,
419{
420 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421 f.debug_struct("EthFilter").finish_non_exhaustive()
422 }
423}
424
425#[derive(Debug)]
427struct EthFilterInner<Eth: EthApiTypes> {
428 eth_api: Eth,
430 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
432 id_provider: Arc<dyn IdProvider>,
434 query_limits: QueryLimits,
436 max_headers_range: u64,
438 task_spawner: Box<dyn TaskSpawner>,
440 stale_filter_ttl: Duration,
442}
443
444impl<Eth> EthFilterInner<Eth>
445where
446 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
447 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
448 + LoadReceipt
449 + EthBlocks
450 + 'static,
451{
452 fn provider(&self) -> &Eth::Provider {
454 self.eth_api.provider()
455 }
456
457 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
459 self.eth_api.cache()
460 }
461
462 async fn logs_for_filter(
464 self: Arc<Self>,
465 filter: Filter,
466 limits: QueryLimits,
467 ) -> Result<Vec<Log>, EthFilterError> {
468 match filter.block_option {
469 FilterBlockOption::AtBlockHash(block_hash) => {
470 let Some((receipts, maybe_block)) =
472 self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
473 else {
474 return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
475 };
476
477 let header = if let Some(block) = &maybe_block {
479 block.header().clone()
480 } else {
481 self.provider()
482 .header_by_hash_or_number(block_hash.into())?
483 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?
484 };
485
486 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
487
488 let mut all_logs = Vec::new();
489 append_matching_block_logs(
490 &mut all_logs,
491 maybe_block
492 .map(ProviderOrBlock::Block)
493 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
494 &filter,
495 block_num_hash,
496 &receipts,
497 false,
498 header.timestamp(),
499 )?;
500
501 Ok(all_logs)
502 }
503 FilterBlockOption::Range { from_block, to_block } => {
504 if from_block.is_some_and(|b| b.is_pending()) {
506 let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
507 if !(to_block.is_pending() || to_block.is_number()) {
508 return Ok(Vec::new());
510 }
511 if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
513 if let BlockNumberOrTag::Number(to_block) = to_block &&
514 to_block < pending_block.block.number()
515 {
516 return Ok(Vec::new());
518 }
519
520 let info = self.provider().chain_info()?;
521 if pending_block.block.number() > info.best_number {
522 let mut all_logs = Vec::new();
524 let timestamp = pending_block.block.timestamp();
525 let block_num_hash = pending_block.block.num_hash();
526 append_matching_block_logs(
527 &mut all_logs,
528 ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
529 &filter,
530 block_num_hash,
531 &pending_block.receipts,
532 false, timestamp,
534 )?;
535 return Ok(all_logs);
536 }
537 }
538 }
539
540 let info = self.provider().chain_info()?;
541 let start_block = info.best_number;
542 let from = from_block
543 .map(|num| self.provider().convert_block_number(num))
544 .transpose()?
545 .flatten();
546 let to = to_block
547 .map(|num| self.provider().convert_block_number(num))
548 .transpose()?
549 .flatten();
550
551 if let Some(t) = to &&
553 t > info.best_number
554 {
555 return Err(EthFilterError::BlockRangeExceedsHead);
556 }
557
558 if let Some(f) = from &&
559 f > info.best_number
560 {
561 return Ok(Vec::new());
563 }
564
565 let (from_block_number, to_block_number) =
566 logs_utils::get_filter_block_range(from, to, start_block, info)?;
567
568 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
569 .await
570 }
571 }
572 }
573
574 async fn install_filter(
576 &self,
577 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
578 ) -> RpcResult<FilterId> {
579 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
580 let subscription_id = self.id_provider.next_id();
581
582 let id = match subscription_id {
583 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
584 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
585 };
586 let mut filters = self.active_filters.inner.lock().await;
587 filters.insert(
588 id.clone(),
589 ActiveFilter {
590 block: last_poll_block_number,
591 last_poll_timestamp: Instant::now(),
592 kind,
593 },
594 );
595 Ok(id)
596 }
597
598 async fn get_logs_in_block_range(
604 self: Arc<Self>,
605 filter: Filter,
606 from_block: u64,
607 to_block: u64,
608 limits: QueryLimits,
609 ) -> Result<Vec<Log>, EthFilterError> {
610 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
611
612 if to_block < from_block {
614 return Err(EthFilterError::InvalidBlockRangeParams)
615 }
616
617 if let Some(max_blocks_per_filter) =
618 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
619 {
620 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
621 }
622
623 let (tx, rx) = oneshot::channel();
624 let this = self.clone();
625 self.task_spawner.spawn_blocking(Box::pin(async move {
626 let res =
627 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
628 let _ = tx.send(res);
629 }));
630
631 rx.await.map_err(|_| EthFilterError::InternalError)?
632 }
633
634 async fn get_logs_in_block_range_inner(
643 self: Arc<Self>,
644 filter: &Filter,
645 from_block: u64,
646 to_block: u64,
647 limits: QueryLimits,
648 ) -> Result<Vec<Log>, EthFilterError> {
649 let mut all_logs = Vec::new();
650 let mut matching_headers = Vec::new();
651
652 let chain_tip = self.provider().best_block_number()?;
654
655 for (from, to) in
657 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
658 {
659 let headers = self.provider().headers_range(from..=to)?;
660
661 let mut headers_iter = headers.into_iter().peekable();
662
663 while let Some(header) = headers_iter.next() {
664 if !filter.matches_bloom(header.logs_bloom()) {
665 continue
666 }
667
668 let current_number = header.number();
669
670 let block_hash = match headers_iter.peek() {
671 Some(next_header) if next_header.number() == current_number + 1 => {
672 next_header.parent_hash()
674 }
675 _ => {
676 header.hash_slow()
678 }
679 };
680
681 matching_headers.push(SealedHeader::new(header, block_hash));
682 }
683 }
684
685 let mut range_mode = RangeMode::new(
687 self.clone(),
688 matching_headers,
689 from_block,
690 to_block,
691 self.max_headers_range,
692 chain_tip,
693 );
694
695 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
697 range_mode.next().await?
698 {
699 let num_hash = header.num_hash();
700 append_matching_block_logs(
701 &mut all_logs,
702 recovered_block
703 .map(ProviderOrBlock::Block)
704 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
705 filter,
706 num_hash,
707 &receipts,
708 false,
709 header.timestamp(),
710 )?;
711
712 let is_multi_block_range = from_block != to_block;
715 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
716 is_multi_block_range &&
717 all_logs.len() > max_logs_per_response
718 {
719 debug!(
720 target: "rpc::eth::filter",
721 logs_found = all_logs.len(),
722 max_logs_per_response,
723 from_block,
724 to_block = num_hash.number.saturating_sub(1),
725 "Query exceeded max logs per response limit"
726 );
727 return Err(EthFilterError::QueryExceedsMaxResults {
728 max_logs: max_logs_per_response,
729 from_block,
730 to_block: num_hash.number.saturating_sub(1),
731 });
732 }
733 }
734
735 Ok(all_logs)
736 }
737}
738
739#[derive(Debug, Clone, Default)]
741pub struct ActiveFilters<T> {
742 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
743}
744
745impl<T> ActiveFilters<T> {
746 pub fn new() -> Self {
748 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
749 }
750}
751
752#[derive(Debug)]
754struct ActiveFilter<T> {
755 block: u64,
757 last_poll_timestamp: Instant,
759 kind: FilterKind<T>,
761}
762
763#[derive(Debug, Clone)]
765struct PendingTransactionsReceiver {
766 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
767}
768
769impl PendingTransactionsReceiver {
770 fn new(receiver: Receiver<TxHash>) -> Self {
771 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
772 }
773
774 async fn drain<T>(&self) -> FilterChanges<T> {
776 let mut pending_txs = Vec::new();
777 let mut prepared_stream = self.txs_receiver.lock().await;
778
779 while let Ok(tx_hash) = prepared_stream.try_recv() {
780 pending_txs.push(tx_hash);
781 }
782
783 FilterChanges::Hashes(pending_txs)
785 }
786}
787
788#[derive(Debug, Clone)]
790struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
791 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
792 converter: TxCompat,
793}
794
795impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
796where
797 T: PoolTransaction + 'static,
798 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
799{
800 fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
802 Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
803 }
804
805 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
807 let mut pending_txs = Vec::new();
808 let mut prepared_stream = self.txs_stream.lock().await;
809
810 while let Ok(tx) = prepared_stream.try_recv() {
811 match self.converter.fill_pending(tx.transaction.to_consensus()) {
812 Ok(tx) => pending_txs.push(tx),
813 Err(err) => {
814 error!(target: "rpc",
815 %err,
816 "Failed to fill txn with block context"
817 );
818 }
819 }
820 }
821 FilterChanges::Transactions(pending_txs)
822 }
823}
824
825#[async_trait]
827trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
828 async fn drain(&self) -> FilterChanges<T>;
829}
830
831#[async_trait]
832impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
833 for FullTransactionsReceiver<T, TxCompat>
834where
835 T: PoolTransaction + 'static,
836 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
837{
838 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
839 Self::drain(self).await
840 }
841}
842
843#[derive(Debug, Clone)]
849enum PendingTransactionKind<T> {
850 Hashes(PendingTransactionsReceiver),
851 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
852}
853
854impl<T: 'static> PendingTransactionKind<T> {
855 async fn drain(&self) -> FilterChanges<T> {
856 match self {
857 Self::Hashes(receiver) => receiver.drain().await,
858 Self::FullTransaction(receiver) => receiver.drain().await,
859 }
860 }
861}
862
863#[derive(Clone, Debug)]
864enum FilterKind<T> {
865 Log(Box<Filter>),
866 Block,
867 PendingTransaction(PendingTransactionKind<T>),
868}
869
870#[derive(Debug)]
872struct BlockRangeInclusiveIter {
873 iter: StepBy<RangeInclusive<u64>>,
874 step: u64,
875 end: u64,
876}
877
878impl BlockRangeInclusiveIter {
879 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
880 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
881 }
882}
883
884impl Iterator for BlockRangeInclusiveIter {
885 type Item = (u64, u64);
886
887 fn next(&mut self) -> Option<Self::Item> {
888 let start = self.iter.next()?;
889 let end = (start + self.step).min(self.end);
890 if start > end {
891 return None
892 }
893 Some((start, end))
894 }
895}
896
897#[derive(Debug, thiserror::Error)]
899pub enum EthFilterError {
900 #[error("filter not found")]
902 FilterNotFound(FilterId),
903 #[error("invalid block range params")]
905 InvalidBlockRangeParams,
906 #[error("block range extends beyond current head block")]
908 BlockRangeExceedsHead,
909 #[error("query exceeds max block range {0}")]
911 QueryExceedsMaxBlocks(u64),
912 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
914 QueryExceedsMaxResults {
915 max_logs: usize,
917 from_block: u64,
919 to_block: u64,
921 },
922 #[error(transparent)]
924 EthAPIError(#[from] EthApiError),
925 #[error("internal filter error")]
927 InternalError,
928}
929
930impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
931 fn from(err: EthFilterError) -> Self {
932 match err {
933 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
934 jsonrpsee::types::error::INVALID_PARAMS_CODE,
935 "filter not found",
936 ),
937 err @ EthFilterError::InternalError => {
938 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
939 }
940 EthFilterError::EthAPIError(err) => err.into(),
941 err @ (EthFilterError::InvalidBlockRangeParams |
942 EthFilterError::QueryExceedsMaxBlocks(_) |
943 EthFilterError::QueryExceedsMaxResults { .. } |
944 EthFilterError::BlockRangeExceedsHead) => {
945 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
946 }
947 }
948 }
949}
950
951impl From<ProviderError> for EthFilterError {
952 fn from(err: ProviderError) -> Self {
953 Self::EthAPIError(err.into())
954 }
955}
956
957impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
958 fn from(err: logs_utils::FilterBlockRangeError) -> Self {
959 match err {
960 logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
961 logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
962 }
963 }
964}
965
966struct ReceiptBlockResult<P>
969where
970 P: ReceiptProvider + BlockReader,
971{
972 receipts: Arc<Vec<ProviderReceipt<P>>>,
974 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
976 header: SealedHeader<<P as HeaderProvider>::Header>,
978}
979
980enum RangeMode<
982 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
983 + EthApiTypes
984 + LoadReceipt
985 + EthBlocks
986 + 'static,
987> {
988 Cached(CachedMode<Eth>),
990 Range(RangeBlockMode<Eth>),
992}
993
994impl<
995 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
996 + EthApiTypes
997 + LoadReceipt
998 + EthBlocks
999 + 'static,
1000 > RangeMode<Eth>
1001{
1002 fn new(
1004 filter_inner: Arc<EthFilterInner<Eth>>,
1005 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1006 from_block: u64,
1007 to_block: u64,
1008 max_headers_range: u64,
1009 chain_tip: u64,
1010 ) -> Self {
1011 let block_count = to_block - from_block + 1;
1012 let distance_from_tip = chain_tip.saturating_sub(to_block);
1013
1014 let use_cached_mode =
1016 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1017
1018 if use_cached_mode && !sealed_headers.is_empty() {
1019 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1020 } else {
1021 Self::Range(RangeBlockMode {
1022 filter_inner,
1023 iter: sealed_headers.into_iter().peekable(),
1024 next: VecDeque::new(),
1025 max_range: max_headers_range as usize,
1026 pending_tasks: FuturesOrdered::new(),
1027 })
1028 }
1029 }
1030
1031 const fn should_use_cached_mode(
1033 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1034 block_count: u64,
1035 distance_from_tip: u64,
1036 ) -> bool {
1037 let bloom_matches = headers.len();
1039
1040 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1042
1043 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1044 }
1045
1046 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1048 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1050 return CACHED_MODE_BLOCK_THRESHOLD;
1051 }
1052
1053 match bloom_matches {
1054 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1055 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1056 _ => CACHED_MODE_BLOCK_THRESHOLD,
1057 }
1058 }
1059
1060 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1062 match self {
1063 Self::Cached(cached) => cached.next().await,
1064 Self::Range(range) => range.next().await,
1065 }
1066 }
1067}
1068
1069struct CachedMode<
1071 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1072 + EthApiTypes
1073 + LoadReceipt
1074 + EthBlocks
1075 + 'static,
1076> {
1077 filter_inner: Arc<EthFilterInner<Eth>>,
1078 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1079}
1080
1081impl<
1082 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1083 + EthApiTypes
1084 + LoadReceipt
1085 + EthBlocks
1086 + 'static,
1087 > CachedMode<Eth>
1088{
1089 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1090 for header in self.headers_iter.by_ref() {
1091 if let Some((receipts, maybe_block)) =
1093 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1094 {
1095 return Ok(Some(ReceiptBlockResult {
1096 receipts,
1097 recovered_block: maybe_block,
1098 header,
1099 }));
1100 }
1101 }
1102
1103 Ok(None) }
1105}
1106
1107type ReceiptFetchFuture<P> =
1109 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1110
1111struct RangeBlockMode<
1113 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1114 + EthApiTypes
1115 + LoadReceipt
1116 + EthBlocks
1117 + 'static,
1118> {
1119 filter_inner: Arc<EthFilterInner<Eth>>,
1120 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1121 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1122 max_range: usize,
1123 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1125}
1126
1127impl<
1128 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1129 + EthApiTypes
1130 + LoadReceipt
1131 + EthBlocks
1132 + 'static,
1133 > RangeBlockMode<Eth>
1134{
1135 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1136 loop {
1137 if let Some(result) = self.next.pop_front() {
1139 return Ok(Some(result));
1140 }
1141
1142 if let Some(task_result) = self.pending_tasks.next().await {
1144 self.next.extend(task_result?);
1145 continue;
1146 }
1147
1148 let Some(next_header) = self.iter.next() else {
1150 return Ok(None);
1152 };
1153
1154 let mut range_headers = Vec::with_capacity(self.max_range);
1155 range_headers.push(next_header);
1156
1157 while range_headers.len() < self.max_range {
1159 let Some(peeked) = self.iter.peek() else { break };
1160 let Some(last_header) = range_headers.last() else { break };
1161
1162 let expected_next = last_header.number() + 1;
1163 if peeked.number() != expected_next {
1164 trace!(
1165 target: "rpc::eth::filter",
1166 last_block = last_header.number(),
1167 next_block = peeked.number(),
1168 expected = expected_next,
1169 range_size = range_headers.len(),
1170 "Non-consecutive block detected, stopping range collection"
1171 );
1172 break; }
1174
1175 let Some(next_header) = self.iter.next() else { break };
1176 range_headers.push(next_header);
1177 }
1178
1179 let remaining_headers = self.iter.len() + range_headers.len();
1181 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1182 self.spawn_parallel_tasks(range_headers);
1183 } else {
1185 if let Some(result) = self.process_small_range(range_headers).await? {
1187 return Ok(Some(result));
1188 }
1189 }
1191 }
1192 }
1193
1194 async fn process_small_range(
1198 &mut self,
1199 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1200 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1201 for header in range_headers {
1203 let (maybe_block, maybe_receipts) = self
1205 .filter_inner
1206 .eth_cache()
1207 .maybe_cached_block_and_receipts(header.hash())
1208 .await?;
1209
1210 let receipts = match maybe_receipts {
1211 Some(receipts) => receipts,
1212 None => {
1213 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1215 Some(receipts) => Arc::new(receipts),
1216 None => continue, }
1218 }
1219 };
1220
1221 if !receipts.is_empty() {
1222 self.next.push_back(ReceiptBlockResult {
1223 receipts,
1224 recovered_block: maybe_block,
1225 header,
1226 });
1227 }
1228 }
1229
1230 Ok(self.next.pop_front())
1231 }
1232
1233 fn spawn_parallel_tasks(
1238 &mut self,
1239 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1240 ) {
1241 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1243 let header_chunks = range_headers
1244 .into_iter()
1245 .chunks(chunk_size)
1246 .into_iter()
1247 .map(|chunk| chunk.collect::<Vec<_>>())
1248 .collect::<Vec<_>>();
1249
1250 for chunk_headers in header_chunks {
1252 let filter_inner = self.filter_inner.clone();
1253 let chunk_task = Box::pin(async move {
1254 let chunk_task = tokio::task::spawn_blocking(move || {
1255 let mut chunk_results = Vec::new();
1256
1257 for header in chunk_headers {
1258 let receipts = match filter_inner
1261 .provider()
1262 .receipts_by_block(header.hash().into())?
1263 {
1264 Some(receipts) => Arc::new(receipts),
1265 None => continue, };
1267
1268 if !receipts.is_empty() {
1269 chunk_results.push(ReceiptBlockResult {
1270 receipts,
1271 recovered_block: None,
1272 header,
1273 });
1274 }
1275 }
1276
1277 Ok(chunk_results)
1278 });
1279
1280 match chunk_task.await {
1282 Ok(Ok(chunk_results)) => Ok(chunk_results),
1283 Ok(Err(e)) => Err(e),
1284 Err(join_err) => {
1285 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1286 Err(EthFilterError::InternalError)
1287 }
1288 }
1289 });
1290
1291 self.pending_tasks.push_back(chunk_task);
1292 }
1293 }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use super::*;
1299 use crate::{eth::EthApi, EthApiBuilder};
1300 use alloy_network::Ethereum;
1301 use alloy_primitives::FixedBytes;
1302 use rand::Rng;
1303 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1304 use reth_ethereum_primitives::TxType;
1305 use reth_evm_ethereum::EthEvmConfig;
1306 use reth_network_api::noop::NoopNetwork;
1307 use reth_provider::test_utils::MockEthProvider;
1308 use reth_rpc_convert::RpcConverter;
1309 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1310 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1311 use reth_tasks::TokioTaskExecutor;
1312 use reth_testing_utils::generators;
1313 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1314 use std::{collections::VecDeque, sync::Arc};
1315
1316 #[test]
1317 fn test_block_range_iter() {
1318 let mut rng = generators::rng();
1319
1320 let start = rng.random::<u32>() as u64;
1321 let end = start.saturating_add(rng.random::<u32>() as u64);
1322 let step = rng.random::<u16>() as u64;
1323 let range = start..=end;
1324 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1325 let (from, mut end) = iter.next().unwrap();
1326 assert_eq!(from, start);
1327 assert_eq!(end, (from + step).min(*range.end()));
1328
1329 for (next_from, next_end) in iter {
1330 assert_eq!(next_from, end + 1);
1332 end = next_end;
1333 }
1334
1335 assert_eq!(end, *range.end());
1336 }
1337
1338 #[expect(clippy::type_complexity)]
1340 fn build_test_eth_api(
1341 provider: MockEthProvider,
1342 ) -> EthApi<
1343 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1344 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1345 > {
1346 EthApiBuilder::new(
1347 provider.clone(),
1348 testing_pool(),
1349 NoopNetwork::default(),
1350 EthEvmConfig::new(provider.chain_spec()),
1351 )
1352 .build()
1353 }
1354
1355 #[tokio::test]
1356 async fn test_range_block_mode_empty_range() {
1357 let provider = MockEthProvider::default();
1358 let eth_api = build_test_eth_api(provider);
1359
1360 let eth_filter = super::EthFilter::new(
1361 eth_api,
1362 EthFilterConfig::default(),
1363 Box::new(TokioTaskExecutor::default()),
1364 );
1365 let filter_inner = eth_filter.inner;
1366
1367 let headers = vec![];
1368 let max_range = 100;
1369
1370 let mut range_mode = RangeBlockMode {
1371 filter_inner,
1372 iter: headers.into_iter().peekable(),
1373 next: VecDeque::new(),
1374 max_range,
1375 pending_tasks: FuturesOrdered::new(),
1376 };
1377
1378 let result = range_mode.next().await;
1379 assert!(result.is_ok());
1380 assert!(result.unwrap().is_none());
1381 }
1382
1383 #[tokio::test]
1384 async fn test_range_block_mode_queued_results_priority() {
1385 let provider = MockEthProvider::default();
1386 let eth_api = build_test_eth_api(provider);
1387
1388 let eth_filter = super::EthFilter::new(
1389 eth_api,
1390 EthFilterConfig::default(),
1391 Box::new(TokioTaskExecutor::default()),
1392 );
1393 let filter_inner = eth_filter.inner;
1394
1395 let headers = vec![
1396 SealedHeader::new(
1397 alloy_consensus::Header { number: 100, ..Default::default() },
1398 FixedBytes::random(),
1399 ),
1400 SealedHeader::new(
1401 alloy_consensus::Header { number: 101, ..Default::default() },
1402 FixedBytes::random(),
1403 ),
1404 ];
1405
1406 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1408 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1409
1410 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1412 tx_type: TxType::Legacy,
1413 cumulative_gas_used: 100_000,
1414 logs: vec![],
1415 success: true,
1416 };
1417 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1418 tx_type: TxType::Eip1559,
1419 cumulative_gas_used: 200_000,
1420 logs: vec![],
1421 success: true,
1422 };
1423 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1424 tx_type: TxType::Eip2930,
1425 cumulative_gas_used: 150_000,
1426 logs: vec![],
1427 success: false, };
1429
1430 let mock_result_1 = ReceiptBlockResult {
1431 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1432 recovered_block: None,
1433 header: SealedHeader::new(
1434 alloy_consensus::Header { number: 42, ..Default::default() },
1435 expected_block_hash_1,
1436 ),
1437 };
1438
1439 let mock_result_2 = ReceiptBlockResult {
1440 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1441 recovered_block: None,
1442 header: SealedHeader::new(
1443 alloy_consensus::Header { number: 43, ..Default::default() },
1444 expected_block_hash_2,
1445 ),
1446 };
1447
1448 let mut range_mode = RangeBlockMode {
1449 filter_inner,
1450 iter: headers.into_iter().peekable(),
1451 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1453 pending_tasks: FuturesOrdered::new(),
1454 };
1455
1456 let result1 = range_mode.next().await;
1458 assert!(result1.is_ok());
1459 let receipt_result1 = result1.unwrap().unwrap();
1460 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1461 assert_eq!(receipt_result1.header.number, 42);
1462
1463 assert_eq!(receipt_result1.receipts.len(), 2);
1465 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1466 assert_eq!(
1467 receipt_result1.receipts[0].cumulative_gas_used,
1468 mock_receipt_1.cumulative_gas_used
1469 );
1470 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1471 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1472 assert_eq!(
1473 receipt_result1.receipts[1].cumulative_gas_used,
1474 mock_receipt_2.cumulative_gas_used
1475 );
1476 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1477
1478 let result2 = range_mode.next().await;
1480 assert!(result2.is_ok());
1481 let receipt_result2 = result2.unwrap().unwrap();
1482 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1483 assert_eq!(receipt_result2.header.number, 43);
1484
1485 assert_eq!(receipt_result2.receipts.len(), 1);
1487 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1488 assert_eq!(
1489 receipt_result2.receipts[0].cumulative_gas_used,
1490 mock_receipt_3.cumulative_gas_used
1491 );
1492 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1493
1494 assert!(range_mode.next.is_empty());
1496
1497 let result3 = range_mode.next().await;
1498 assert!(result3.is_ok());
1499 }
1500
1501 #[tokio::test]
1502 async fn test_range_block_mode_single_block_no_receipts() {
1503 let provider = MockEthProvider::default();
1504 let eth_api = build_test_eth_api(provider);
1505
1506 let eth_filter = super::EthFilter::new(
1507 eth_api,
1508 EthFilterConfig::default(),
1509 Box::new(TokioTaskExecutor::default()),
1510 );
1511 let filter_inner = eth_filter.inner;
1512
1513 let headers = vec![SealedHeader::new(
1514 alloy_consensus::Header { number: 100, ..Default::default() },
1515 FixedBytes::random(),
1516 )];
1517
1518 let mut range_mode = RangeBlockMode {
1519 filter_inner,
1520 iter: headers.into_iter().peekable(),
1521 next: VecDeque::new(),
1522 max_range: 100,
1523 pending_tasks: FuturesOrdered::new(),
1524 };
1525
1526 let result = range_mode.next().await;
1527 assert!(result.is_ok());
1528 }
1529
1530 #[tokio::test]
1531 async fn test_range_block_mode_provider_receipts() {
1532 let provider = MockEthProvider::default();
1533
1534 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1535 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1536 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1537
1538 let block_hash_1 = FixedBytes::random();
1539 let block_hash_2 = FixedBytes::random();
1540 let block_hash_3 = FixedBytes::random();
1541
1542 provider.add_header(block_hash_1, header_1.clone());
1543 provider.add_header(block_hash_2, header_2.clone());
1544 provider.add_header(block_hash_3, header_3.clone());
1545
1546 let mock_log = alloy_primitives::Log {
1548 address: alloy_primitives::Address::ZERO,
1549 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1550 };
1551
1552 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1553 tx_type: TxType::Legacy,
1554 cumulative_gas_used: 21_000,
1555 logs: vec![mock_log.clone()],
1556 success: true,
1557 };
1558 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1559 tx_type: TxType::Eip1559,
1560 cumulative_gas_used: 42_000,
1561 logs: vec![mock_log.clone()],
1562 success: true,
1563 };
1564 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1565 tx_type: TxType::Eip2930,
1566 cumulative_gas_used: 30_000,
1567 logs: vec![mock_log.clone()],
1568 success: false,
1569 };
1570
1571 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1572 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1573
1574 let eth_api = build_test_eth_api(provider);
1575
1576 let eth_filter = super::EthFilter::new(
1577 eth_api,
1578 EthFilterConfig::default(),
1579 Box::new(TokioTaskExecutor::default()),
1580 );
1581 let filter_inner = eth_filter.inner;
1582
1583 let headers = vec![
1584 SealedHeader::new(header_1, block_hash_1),
1585 SealedHeader::new(header_2, block_hash_2),
1586 SealedHeader::new(header_3, block_hash_3),
1587 ];
1588
1589 let mut range_mode = RangeBlockMode {
1590 filter_inner,
1591 iter: headers.into_iter().peekable(),
1592 next: VecDeque::new(),
1593 max_range: 3, pending_tasks: FuturesOrdered::new(),
1595 };
1596
1597 let result = range_mode.next().await;
1599 assert!(result.is_ok());
1600 let receipt_result = result.unwrap().unwrap();
1601
1602 assert_eq!(receipt_result.header.hash(), block_hash_1);
1603 assert_eq!(receipt_result.header.number, 100);
1604 assert_eq!(receipt_result.receipts.len(), 2);
1605
1606 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1608 assert_eq!(
1609 receipt_result.receipts[0].cumulative_gas_used,
1610 receipt_100_1.cumulative_gas_used
1611 );
1612 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1613
1614 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1615 assert_eq!(
1616 receipt_result.receipts[1].cumulative_gas_used,
1617 receipt_100_2.cumulative_gas_used
1618 );
1619 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1620
1621 let result2 = range_mode.next().await;
1623 assert!(result2.is_ok());
1624 let receipt_result2 = result2.unwrap().unwrap();
1625
1626 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1627 assert_eq!(receipt_result2.header.number, 101);
1628 assert_eq!(receipt_result2.receipts.len(), 1);
1629
1630 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1632 assert_eq!(
1633 receipt_result2.receipts[0].cumulative_gas_used,
1634 receipt_101_1.cumulative_gas_used
1635 );
1636 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1637
1638 let result3 = range_mode.next().await;
1640 assert!(result3.is_ok());
1641 assert!(result3.unwrap().is_none());
1642 }
1643
1644 #[tokio::test]
1645 async fn test_range_block_mode_iterator_exhaustion() {
1646 let provider = MockEthProvider::default();
1647
1648 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1649 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1650
1651 let block_hash_100 = FixedBytes::random();
1652 let block_hash_101 = FixedBytes::random();
1653
1654 provider.add_header(block_hash_100, header_100.clone());
1656 provider.add_header(block_hash_101, header_101.clone());
1657
1658 let mock_receipt = reth_ethereum_primitives::Receipt {
1660 tx_type: TxType::Legacy,
1661 cumulative_gas_used: 21_000,
1662 logs: vec![],
1663 success: true,
1664 };
1665 provider.add_receipts(100, vec![mock_receipt.clone()]);
1666 provider.add_receipts(101, vec![mock_receipt.clone()]);
1667
1668 let eth_api = build_test_eth_api(provider);
1669
1670 let eth_filter = super::EthFilter::new(
1671 eth_api,
1672 EthFilterConfig::default(),
1673 Box::new(TokioTaskExecutor::default()),
1674 );
1675 let filter_inner = eth_filter.inner;
1676
1677 let headers = vec![
1678 SealedHeader::new(header_100, block_hash_100),
1679 SealedHeader::new(header_101, block_hash_101),
1680 ];
1681
1682 let mut range_mode = RangeBlockMode {
1683 filter_inner,
1684 iter: headers.into_iter().peekable(),
1685 next: VecDeque::new(),
1686 max_range: 1,
1687 pending_tasks: FuturesOrdered::new(),
1688 };
1689
1690 let result1 = range_mode.next().await;
1691 assert!(result1.is_ok());
1692 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1697 assert!(result2.is_ok());
1698 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1702
1703 let result3 = range_mode.next().await;
1705 assert!(result3.is_ok());
1706 assert!(result3.unwrap().is_none());
1707 }
1708
1709 #[tokio::test]
1710 async fn test_cached_mode_with_mock_receipts() {
1711 let test_hash = FixedBytes::from([42u8; 32]);
1713 let test_block_number = 100u64;
1714 let test_header = SealedHeader::new(
1715 alloy_consensus::Header {
1716 number: test_block_number,
1717 gas_used: 50_000,
1718 ..Default::default()
1719 },
1720 test_hash,
1721 );
1722
1723 let mock_log = alloy_primitives::Log {
1725 address: alloy_primitives::Address::ZERO,
1726 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1727 };
1728
1729 let mock_receipt = reth_ethereum_primitives::Receipt {
1730 tx_type: TxType::Legacy,
1731 cumulative_gas_used: 21_000,
1732 logs: vec![mock_log],
1733 success: true,
1734 };
1735
1736 let provider = MockEthProvider::default();
1737 provider.add_header(test_hash, test_header.header().clone());
1738 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1739
1740 let eth_api = build_test_eth_api(provider);
1741 let eth_filter = super::EthFilter::new(
1742 eth_api,
1743 EthFilterConfig::default(),
1744 Box::new(TokioTaskExecutor::default()),
1745 );
1746 let filter_inner = eth_filter.inner;
1747
1748 let headers = vec![test_header.clone()];
1749
1750 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1751
1752 let result = cached_mode.next().await.expect("next should succeed");
1754 let receipt_block_result = result.expect("should have receipt result");
1755 assert_eq!(receipt_block_result.header.hash(), test_hash);
1756 assert_eq!(receipt_block_result.header.number, test_block_number);
1757 assert_eq!(receipt_block_result.receipts.len(), 1);
1758 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1759 assert_eq!(
1760 receipt_block_result.receipts[0].cumulative_gas_used,
1761 mock_receipt.cumulative_gas_used
1762 );
1763 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1764
1765 let result2 = cached_mode.next().await;
1767 assert!(result2.is_ok());
1768 assert!(result2.unwrap().is_none());
1769 }
1770
1771 #[tokio::test]
1772 async fn test_cached_mode_empty_headers() {
1773 let provider = MockEthProvider::default();
1774 let eth_api = build_test_eth_api(provider);
1775
1776 let eth_filter = super::EthFilter::new(
1777 eth_api,
1778 EthFilterConfig::default(),
1779 Box::new(TokioTaskExecutor::default()),
1780 );
1781 let filter_inner = eth_filter.inner;
1782
1783 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1784
1785 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1786
1787 let result = cached_mode.next().await.expect("next should succeed");
1789 assert!(result.is_none());
1790 }
1791
1792 #[tokio::test]
1793 async fn test_non_consecutive_headers_after_bloom_filter() {
1794 let provider = MockEthProvider::default();
1795
1796 let mut expected_hashes = vec![];
1798 let mut prev_hash = alloy_primitives::B256::default();
1799
1800 use alloy_consensus::TxLegacy;
1802 use reth_ethereum_primitives::{TransactionSigned, TxType};
1803
1804 let tx_inner = TxLegacy {
1805 chain_id: Some(1),
1806 nonce: 0,
1807 gas_price: 21_000,
1808 gas_limit: 21_000,
1809 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1810 value: alloy_primitives::U256::ZERO,
1811 input: alloy_primitives::Bytes::new(),
1812 };
1813 let signature = alloy_primitives::Signature::test_signature();
1814 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1815
1816 for i in 100u64..=103 {
1817 let header = alloy_consensus::Header {
1818 number: i,
1819 parent_hash: prev_hash,
1820 logs_bloom: if i == 100 || i == 102 {
1822 alloy_primitives::Bloom::from([1u8; 256])
1823 } else {
1824 alloy_primitives::Bloom::default()
1825 },
1826 ..Default::default()
1827 };
1828
1829 let hash = header.hash_slow();
1830 expected_hashes.push(hash);
1831 prev_hash = hash;
1832
1833 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1835
1836 let block = reth_ethereum_primitives::Block {
1837 header,
1838 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1839 };
1840 provider.add_block(hash, block);
1841 }
1842
1843 let mock_log = alloy_primitives::Log {
1845 address: alloy_primitives::Address::ZERO,
1846 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1847 };
1848
1849 let receipt = reth_ethereum_primitives::Receipt {
1850 tx_type: TxType::Legacy,
1851 cumulative_gas_used: 21_000,
1852 logs: vec![mock_log],
1853 success: true,
1854 };
1855
1856 provider.add_receipts(100, vec![receipt.clone()]);
1857 provider.add_receipts(101, vec![]);
1858 provider.add_receipts(102, vec![receipt.clone()]);
1859 provider.add_receipts(103, vec![]);
1860
1861 use reth_db_api::models::StoredBlockBodyIndices;
1863 provider
1864 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1865 provider
1866 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1867 provider
1868 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1869 provider
1870 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1871
1872 let eth_api = build_test_eth_api(provider);
1873 let eth_filter = EthFilter::new(
1874 eth_api,
1875 EthFilterConfig::default(),
1876 Box::new(TokioTaskExecutor::default()),
1877 );
1878
1879 let filter = Filter::default();
1881
1882 let logs = eth_filter
1884 .inner
1885 .clone()
1886 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1887 .await
1888 .expect("should succeed");
1889
1890 assert_eq!(logs.len(), 2);
1892
1893 assert_eq!(logs[0].block_number, Some(100));
1894 assert_eq!(logs[1].block_number, Some(102));
1895
1896 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1900}