1use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8 PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12 future::TryFutureExt,
13 stream::{FuturesOrdered, StreamExt},
14 Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21 helpers::{EthBlocks, LoadReceipt},
22 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23 RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32 ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37 collections::{HashMap, VecDeque},
38 fmt,
39 iter::{Peekable, StepBy},
40 ops::RangeInclusive,
41 pin::Pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 sync::{mpsc::Receiver, oneshot, Mutex},
47 time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53 Eth: FullEthApiTypes
54 + RpcNodeCoreExt<Provider: BlockIdReader>
55 + LoadReceipt
56 + EthBlocks
57 + 'static,
58{
59 fn logs(
61 &self,
62 filter: Filter,
63 limits: QueryLimits,
64 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65 trace!(target: "rpc::eth", "Serving eth_getLogs");
66 self.logs_for_filter(filter, limits).map_err(|e| e.into())
67 }
68}
69
70const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91pub struct EthFilter<Eth: EthApiTypes> {
95 inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101 Eth: EthApiTypes,
102{
103 fn clone(&self) -> Self {
104 Self { inner: self.inner.clone() }
105 }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110 Eth: EthApiTypes + 'static,
111{
112 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
140 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141 config;
142 let inner = EthFilterInner {
143 eth_api,
144 active_filters: ActiveFilters::new(),
145 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146 max_headers_range: MAX_HEADERS_RANGE,
147 task_spawner,
148 stale_filter_ttl,
149 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150 };
151
152 let eth_filter = Self { inner: Arc::new(inner) };
153
154 let this = eth_filter.clone();
155 eth_filter.inner.task_spawner.spawn_critical(
156 "eth-filters_stale-filters-clean",
157 Box::pin(async move {
158 this.watch_and_clear_stale_filters().await;
159 }),
160 );
161
162 eth_filter
163 }
164
165 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167 &self.inner.active_filters
168 }
169
170 async fn watch_and_clear_stale_filters(&self) {
173 let mut interval = tokio::time::interval_at(
174 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175 self.inner.stale_filter_ttl,
176 );
177 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178 loop {
179 interval.tick().await;
180 self.clear_stale_filters(Instant::now()).await;
181 }
182 }
183
184 pub async fn clear_stale_filters(&self, now: Instant) {
187 trace!(target: "rpc::eth", "clear stale filters");
188 let mut filters = self.active_filters().inner.lock().await;
189 filters.retain(|id, filter| {
190 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192 if !is_valid {
193 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194 }
195
196 is_valid
197 });
198 filters.shrink_to_fit();
199 }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205 + RpcNodeCoreExt
206 + LoadReceipt
207 + EthBlocks
208 + 'static,
209{
210 fn provider(&self) -> &Eth::Provider {
212 self.inner.eth_api.provider()
213 }
214
215 fn pool(&self) -> &Eth::Pool {
217 self.inner.eth_api.pool()
218 }
219
220 pub async fn filter_changes(
222 &self,
223 id: FilterId,
224 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225 let info = self.provider().chain_info()?;
226 let best_number = info.best_number;
227
228 let (start_block, kind) = {
231 let mut filters = self.inner.active_filters.inner.lock().await;
232 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234 if filter.block > best_number {
235 return Ok(FilterChanges::Empty)
237 }
238
239 let mut block = best_number + 1;
243 std::mem::swap(&mut filter.block, &mut block);
244 filter.last_poll_timestamp = Instant::now();
245
246 (block, filter.kind.clone())
247 };
248
249 match kind {
250 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251 FilterKind::Block => {
252 let end_block = best_number + 1;
255 let block_hashes =
256 self.provider().canonical_hashes_range(start_block, end_block).map_err(
257 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258 )?;
259 Ok(FilterChanges::Hashes(block_hashes))
260 }
261 FilterKind::Log(filter) => {
262 let (from_block_number, to_block_number) = match filter.block_option {
263 FilterBlockOption::Range { from_block, to_block } => {
264 let from = from_block
265 .map(|num| self.provider().convert_block_number(num))
266 .transpose()?
267 .flatten();
268 let to = to_block
269 .map(|num| self.provider().convert_block_number(num))
270 .transpose()?
271 .flatten();
272 logs_utils::get_filter_block_range(from, to, start_block, info)?
273 }
274 FilterBlockOption::AtBlockHash(_) => {
275 (start_block, best_number)
279 }
280 };
281 let logs = self
282 .inner
283 .clone()
284 .get_logs_in_block_range(
285 *filter,
286 from_block_number,
287 to_block_number,
288 self.inner.query_limits,
289 )
290 .await?;
291 Ok(FilterChanges::Logs(logs))
292 }
293 }
294 }
295
296 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
302 let filter = {
303 let mut filters = self.inner.active_filters.inner.lock().await;
304 let filter =
305 filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
306 if let FilterKind::Log(ref inner_filter) = filter.kind {
307 filter.last_poll_timestamp = Instant::now();
308 *inner_filter.clone()
309 } else {
310 return Err(EthFilterError::FilterNotFound(id))
312 }
313 };
314
315 self.logs_for_filter(filter, self.inner.query_limits).await
316 }
317
318 async fn logs_for_filter(
320 &self,
321 filter: Filter,
322 limits: QueryLimits,
323 ) -> Result<Vec<Log>, EthFilterError> {
324 self.inner.clone().logs_for_filter(filter, limits).await
325 }
326}
327
328#[async_trait]
329impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
330where
331 Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
332{
333 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
335 trace!(target: "rpc::eth", "Serving eth_newFilter");
336 self.inner
337 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
338 .await
339 }
340
341 async fn new_block_filter(&self) -> RpcResult<FilterId> {
343 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
344 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
345 }
346
347 async fn new_pending_transaction_filter(
349 &self,
350 kind: Option<PendingTransactionFilterKind>,
351 ) -> RpcResult<FilterId> {
352 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
353
354 let transaction_kind = match kind.unwrap_or_default() {
355 PendingTransactionFilterKind::Hashes => {
356 let receiver = self.pool().pending_transactions_listener();
357 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
358 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
359 }
360 PendingTransactionFilterKind::Full => {
361 let stream = self.pool().new_pending_pool_transactions_listener();
362 let full_txs_receiver = FullTransactionsReceiver::new(
363 stream,
364 dyn_clone::clone(self.inner.eth_api.converter()),
365 );
366 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
367 full_txs_receiver,
368 )))
369 }
370 };
371
372 self.inner.install_filter(transaction_kind).await
374 }
375
376 async fn filter_changes(
378 &self,
379 id: FilterId,
380 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
381 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
382 Ok(Self::filter_changes(self, id).await?)
383 }
384
385 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
391 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
392 Ok(Self::filter_logs(self, id).await?)
393 }
394
395 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
397 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
398 let mut filters = self.inner.active_filters.inner.lock().await;
399 if filters.remove(&id).is_some() {
400 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
401 Ok(true)
402 } else {
403 Ok(false)
404 }
405 }
406
407 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
411 trace!(target: "rpc::eth", "Serving eth_getLogs");
412 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
413 }
414}
415
416impl<Eth> std::fmt::Debug for EthFilter<Eth>
417where
418 Eth: EthApiTypes,
419{
420 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421 f.debug_struct("EthFilter").finish_non_exhaustive()
422 }
423}
424
425#[derive(Debug)]
427struct EthFilterInner<Eth: EthApiTypes> {
428 eth_api: Eth,
430 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
432 id_provider: Arc<dyn IdProvider>,
434 query_limits: QueryLimits,
436 max_headers_range: u64,
438 task_spawner: Box<dyn TaskSpawner>,
440 stale_filter_ttl: Duration,
442}
443
444impl<Eth> EthFilterInner<Eth>
445where
446 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
447 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
448 + LoadReceipt
449 + EthBlocks
450 + 'static,
451{
452 fn provider(&self) -> &Eth::Provider {
454 self.eth_api.provider()
455 }
456
457 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
459 self.eth_api.cache()
460 }
461
462 async fn logs_for_filter(
464 self: Arc<Self>,
465 filter: Filter,
466 limits: QueryLimits,
467 ) -> Result<Vec<Log>, EthFilterError> {
468 match filter.block_option {
469 FilterBlockOption::AtBlockHash(block_hash) => {
470 let Some((receipts, maybe_block)) =
472 self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
473 else {
474 return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
475 };
476
477 let header = if let Some(block) = &maybe_block {
479 block.header().clone()
480 } else {
481 self.provider()
482 .header_by_hash_or_number(block_hash.into())?
483 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?
484 };
485
486 let earliest_block = self.provider().earliest_block_number()?;
488 if header.number() < earliest_block {
489 return Err(EthApiError::PrunedHistoryUnavailable.into());
490 }
491
492 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
493
494 let mut all_logs = Vec::new();
495 append_matching_block_logs(
496 &mut all_logs,
497 maybe_block
498 .map(ProviderOrBlock::Block)
499 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
500 &filter,
501 block_num_hash,
502 &receipts,
503 false,
504 header.timestamp(),
505 )?;
506
507 Ok(all_logs)
508 }
509 FilterBlockOption::Range { from_block, to_block } => {
510 if from_block.is_some_and(|b| b.is_pending()) {
512 let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
513 if !(to_block.is_pending() || to_block.is_number()) {
514 return Ok(Vec::new());
516 }
517 if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
519 if let BlockNumberOrTag::Number(to_block) = to_block &&
520 to_block < pending_block.block.number()
521 {
522 return Ok(Vec::new());
524 }
525
526 let info = self.provider().chain_info()?;
527 if pending_block.block.number() > info.best_number {
528 let mut all_logs = Vec::new();
530 let timestamp = pending_block.block.timestamp();
531 let block_num_hash = pending_block.block.num_hash();
532 append_matching_block_logs(
533 &mut all_logs,
534 ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
535 &filter,
536 block_num_hash,
537 &pending_block.receipts,
538 false, timestamp,
540 )?;
541 return Ok(all_logs);
542 }
543 }
544 }
545
546 let info = self.provider().chain_info()?;
547 let start_block = info.best_number;
548 let from = from_block
549 .map(|num| self.provider().convert_block_number(num))
550 .transpose()?
551 .flatten();
552 let to = to_block
553 .map(|num| self.provider().convert_block_number(num))
554 .transpose()?
555 .flatten();
556
557 if let Some(t) = to &&
559 t > info.best_number
560 {
561 return Err(EthFilterError::BlockRangeExceedsHead);
562 }
563
564 if let Some(f) = from &&
565 f > info.best_number
566 {
567 return Ok(Vec::new());
569 }
570
571 let (from_block_number, to_block_number) =
572 logs_utils::get_filter_block_range(from, to, start_block, info)?;
573
574 let earliest_block = self.provider().earliest_block_number()?;
576 if from_block_number < earliest_block {
577 return Err(EthApiError::PrunedHistoryUnavailable.into());
578 }
579
580 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
581 .await
582 }
583 }
584 }
585
586 async fn install_filter(
588 &self,
589 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
590 ) -> RpcResult<FilterId> {
591 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
592 let subscription_id = self.id_provider.next_id();
593
594 let id = match subscription_id {
595 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
596 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
597 };
598 let mut filters = self.active_filters.inner.lock().await;
599 filters.insert(
600 id.clone(),
601 ActiveFilter {
602 block: last_poll_block_number,
603 last_poll_timestamp: Instant::now(),
604 kind,
605 },
606 );
607 Ok(id)
608 }
609
610 async fn get_logs_in_block_range(
616 self: Arc<Self>,
617 filter: Filter,
618 from_block: u64,
619 to_block: u64,
620 limits: QueryLimits,
621 ) -> Result<Vec<Log>, EthFilterError> {
622 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
623
624 if to_block < from_block {
626 return Err(EthFilterError::InvalidBlockRangeParams)
627 }
628
629 if let Some(max_blocks_per_filter) =
630 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
631 {
632 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
633 }
634
635 let (tx, rx) = oneshot::channel();
636 let this = self.clone();
637 self.task_spawner.spawn_blocking(Box::pin(async move {
638 let res =
639 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
640 let _ = tx.send(res);
641 }));
642
643 rx.await.map_err(|_| EthFilterError::InternalError)?
644 }
645
646 async fn get_logs_in_block_range_inner(
655 self: Arc<Self>,
656 filter: &Filter,
657 from_block: u64,
658 to_block: u64,
659 limits: QueryLimits,
660 ) -> Result<Vec<Log>, EthFilterError> {
661 let mut all_logs = Vec::new();
662 let mut matching_headers = Vec::new();
663
664 let chain_tip = self.provider().best_block_number()?;
666
667 for (from, to) in
669 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
670 {
671 let headers = self.provider().headers_range(from..=to)?;
672
673 let mut headers_iter = headers.into_iter().peekable();
674
675 while let Some(header) = headers_iter.next() {
676 if !filter.matches_bloom(header.logs_bloom()) {
677 continue
678 }
679
680 let current_number = header.number();
681
682 let block_hash = match headers_iter.peek() {
683 Some(next_header) if next_header.number() == current_number + 1 => {
684 next_header.parent_hash()
686 }
687 _ => {
688 header.hash_slow()
690 }
691 };
692
693 matching_headers.push(SealedHeader::new(header, block_hash));
694 }
695 }
696
697 let mut range_mode = RangeMode::new(
699 self.clone(),
700 matching_headers,
701 from_block,
702 to_block,
703 self.max_headers_range,
704 chain_tip,
705 );
706
707 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
709 range_mode.next().await?
710 {
711 let num_hash = header.num_hash();
712 append_matching_block_logs(
713 &mut all_logs,
714 recovered_block
715 .map(ProviderOrBlock::Block)
716 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
717 filter,
718 num_hash,
719 &receipts,
720 false,
721 header.timestamp(),
722 )?;
723
724 let is_multi_block_range = from_block != to_block;
727 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
728 is_multi_block_range &&
729 all_logs.len() > max_logs_per_response
730 {
731 debug!(
732 target: "rpc::eth::filter",
733 logs_found = all_logs.len(),
734 max_logs_per_response,
735 from_block,
736 to_block = num_hash.number,
737 "Query exceeded max logs per response limit"
738 );
739 return Err(EthFilterError::QueryExceedsMaxResults {
740 max_logs: max_logs_per_response,
741 from_block,
742 to_block: num_hash.number,
743 });
744 }
745 }
746
747 Ok(all_logs)
748 }
749}
750
751#[derive(Debug, Clone, Default)]
753pub struct ActiveFilters<T> {
754 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
755}
756
757impl<T> ActiveFilters<T> {
758 pub fn new() -> Self {
760 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
761 }
762}
763
764#[derive(Debug)]
766struct ActiveFilter<T> {
767 block: u64,
769 last_poll_timestamp: Instant,
771 kind: FilterKind<T>,
773}
774
775#[derive(Debug, Clone)]
777struct PendingTransactionsReceiver {
778 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
779}
780
781impl PendingTransactionsReceiver {
782 fn new(receiver: Receiver<TxHash>) -> Self {
783 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
784 }
785
786 async fn drain<T>(&self) -> FilterChanges<T> {
788 let mut pending_txs = Vec::new();
789 let mut prepared_stream = self.txs_receiver.lock().await;
790
791 while let Ok(tx_hash) = prepared_stream.try_recv() {
792 pending_txs.push(tx_hash);
793 }
794
795 FilterChanges::Hashes(pending_txs)
797 }
798}
799
800#[derive(Debug, Clone)]
802struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
803 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
804 converter: TxCompat,
805}
806
807impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
808where
809 T: PoolTransaction + 'static,
810 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
811{
812 fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
814 Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
815 }
816
817 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
819 let mut pending_txs = Vec::new();
820 let mut prepared_stream = self.txs_stream.lock().await;
821
822 while let Ok(tx) = prepared_stream.try_recv() {
823 match self.converter.fill_pending(tx.transaction.to_consensus()) {
824 Ok(tx) => pending_txs.push(tx),
825 Err(err) => {
826 error!(target: "rpc",
827 %err,
828 "Failed to fill txn with block context"
829 );
830 }
831 }
832 }
833 FilterChanges::Transactions(pending_txs)
834 }
835}
836
837#[async_trait]
839trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
840 async fn drain(&self) -> FilterChanges<T>;
841}
842
843#[async_trait]
844impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
845 for FullTransactionsReceiver<T, TxCompat>
846where
847 T: PoolTransaction + 'static,
848 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
849{
850 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
851 Self::drain(self).await
852 }
853}
854
855#[derive(Debug, Clone)]
861enum PendingTransactionKind<T> {
862 Hashes(PendingTransactionsReceiver),
863 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
864}
865
866impl<T: 'static> PendingTransactionKind<T> {
867 async fn drain(&self) -> FilterChanges<T> {
868 match self {
869 Self::Hashes(receiver) => receiver.drain().await,
870 Self::FullTransaction(receiver) => receiver.drain().await,
871 }
872 }
873}
874
875#[derive(Clone, Debug)]
876enum FilterKind<T> {
877 Log(Box<Filter>),
878 Block,
879 PendingTransaction(PendingTransactionKind<T>),
880}
881
882#[derive(Debug)]
884struct BlockRangeInclusiveIter {
885 iter: StepBy<RangeInclusive<u64>>,
886 step: u64,
887 end: u64,
888}
889
890impl BlockRangeInclusiveIter {
891 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
892 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
893 }
894}
895
896impl Iterator for BlockRangeInclusiveIter {
897 type Item = (u64, u64);
898
899 fn next(&mut self) -> Option<Self::Item> {
900 let start = self.iter.next()?;
901 let end = (start + self.step).min(self.end);
902 if start > end {
903 return None
904 }
905 Some((start, end))
906 }
907}
908
909#[derive(Debug, thiserror::Error)]
911pub enum EthFilterError {
912 #[error("filter not found")]
914 FilterNotFound(FilterId),
915 #[error("invalid block range params")]
917 InvalidBlockRangeParams,
918 #[error("block range extends beyond current head block")]
920 BlockRangeExceedsHead,
921 #[error("query exceeds max block range {0}")]
923 QueryExceedsMaxBlocks(u64),
924 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
926 QueryExceedsMaxResults {
927 max_logs: usize,
929 from_block: u64,
931 to_block: u64,
933 },
934 #[error(transparent)]
936 EthAPIError(#[from] EthApiError),
937 #[error("internal filter error")]
939 InternalError,
940}
941
942impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
943 fn from(err: EthFilterError) -> Self {
944 match err {
945 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
946 jsonrpsee::types::error::INVALID_PARAMS_CODE,
947 "filter not found",
948 ),
949 err @ EthFilterError::InternalError => {
950 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
951 }
952 EthFilterError::EthAPIError(err) => err.into(),
953 err @ (EthFilterError::InvalidBlockRangeParams |
954 EthFilterError::QueryExceedsMaxBlocks(_) |
955 EthFilterError::QueryExceedsMaxResults { .. } |
956 EthFilterError::BlockRangeExceedsHead) => {
957 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
958 }
959 }
960 }
961}
962
963impl From<ProviderError> for EthFilterError {
964 fn from(err: ProviderError) -> Self {
965 Self::EthAPIError(err.into())
966 }
967}
968
969impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
970 fn from(err: logs_utils::FilterBlockRangeError) -> Self {
971 match err {
972 logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
973 logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
974 }
975 }
976}
977
978struct ReceiptBlockResult<P>
981where
982 P: ReceiptProvider + BlockReader,
983{
984 receipts: Arc<Vec<ProviderReceipt<P>>>,
986 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
988 header: SealedHeader<<P as HeaderProvider>::Header>,
990}
991
992enum RangeMode<
994 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
995 + EthApiTypes
996 + LoadReceipt
997 + EthBlocks
998 + 'static,
999> {
1000 Cached(CachedMode<Eth>),
1002 Range(RangeBlockMode<Eth>),
1004}
1005
1006impl<
1007 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1008 + EthApiTypes
1009 + LoadReceipt
1010 + EthBlocks
1011 + 'static,
1012 > RangeMode<Eth>
1013{
1014 fn new(
1016 filter_inner: Arc<EthFilterInner<Eth>>,
1017 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1018 from_block: u64,
1019 to_block: u64,
1020 max_headers_range: u64,
1021 chain_tip: u64,
1022 ) -> Self {
1023 let block_count = to_block - from_block + 1;
1024 let distance_from_tip = chain_tip.saturating_sub(to_block);
1025
1026 let use_cached_mode =
1028 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1029
1030 if use_cached_mode && !sealed_headers.is_empty() {
1031 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1032 } else {
1033 Self::Range(RangeBlockMode {
1034 filter_inner,
1035 iter: sealed_headers.into_iter().peekable(),
1036 next: VecDeque::new(),
1037 max_range: max_headers_range as usize,
1038 pending_tasks: FuturesOrdered::new(),
1039 })
1040 }
1041 }
1042
1043 const fn should_use_cached_mode(
1045 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1046 block_count: u64,
1047 distance_from_tip: u64,
1048 ) -> bool {
1049 let bloom_matches = headers.len();
1051
1052 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1054
1055 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1056 }
1057
1058 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1060 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1062 return CACHED_MODE_BLOCK_THRESHOLD;
1063 }
1064
1065 match bloom_matches {
1066 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1067 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1068 _ => CACHED_MODE_BLOCK_THRESHOLD,
1069 }
1070 }
1071
1072 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1074 match self {
1075 Self::Cached(cached) => cached.next().await,
1076 Self::Range(range) => range.next().await,
1077 }
1078 }
1079}
1080
1081struct CachedMode<
1083 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1084 + EthApiTypes
1085 + LoadReceipt
1086 + EthBlocks
1087 + 'static,
1088> {
1089 filter_inner: Arc<EthFilterInner<Eth>>,
1090 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1091}
1092
1093impl<
1094 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1095 + EthApiTypes
1096 + LoadReceipt
1097 + EthBlocks
1098 + 'static,
1099 > CachedMode<Eth>
1100{
1101 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1102 for header in self.headers_iter.by_ref() {
1103 if let Some((receipts, maybe_block)) =
1105 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1106 {
1107 return Ok(Some(ReceiptBlockResult {
1108 receipts,
1109 recovered_block: maybe_block,
1110 header,
1111 }));
1112 }
1113 }
1114
1115 Ok(None) }
1117}
1118
1119type ReceiptFetchFuture<P> =
1121 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1122
1123struct RangeBlockMode<
1125 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1126 + EthApiTypes
1127 + LoadReceipt
1128 + EthBlocks
1129 + 'static,
1130> {
1131 filter_inner: Arc<EthFilterInner<Eth>>,
1132 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1133 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1134 max_range: usize,
1135 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1137}
1138
1139impl<
1140 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1141 + EthApiTypes
1142 + LoadReceipt
1143 + EthBlocks
1144 + 'static,
1145 > RangeBlockMode<Eth>
1146{
1147 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1148 loop {
1149 if let Some(result) = self.next.pop_front() {
1151 return Ok(Some(result));
1152 }
1153
1154 if let Some(task_result) = self.pending_tasks.next().await {
1156 self.next.extend(task_result?);
1157 continue;
1158 }
1159
1160 let Some(next_header) = self.iter.next() else {
1162 return Ok(None);
1164 };
1165
1166 let mut range_headers = Vec::with_capacity(self.max_range);
1167 range_headers.push(next_header);
1168
1169 while range_headers.len() < self.max_range {
1171 let Some(peeked) = self.iter.peek() else { break };
1172 let Some(last_header) = range_headers.last() else { break };
1173
1174 let expected_next = last_header.number() + 1;
1175 if peeked.number() != expected_next {
1176 trace!(
1177 target: "rpc::eth::filter",
1178 last_block = last_header.number(),
1179 next_block = peeked.number(),
1180 expected = expected_next,
1181 range_size = range_headers.len(),
1182 "Non-consecutive block detected, stopping range collection"
1183 );
1184 break; }
1186
1187 let Some(next_header) = self.iter.next() else { break };
1188 range_headers.push(next_header);
1189 }
1190
1191 let remaining_headers = self.iter.len() + range_headers.len();
1193 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1194 self.spawn_parallel_tasks(range_headers);
1195 } else {
1197 if let Some(result) = self.process_small_range(range_headers).await? {
1199 return Ok(Some(result));
1200 }
1201 }
1203 }
1204 }
1205
1206 async fn process_small_range(
1210 &mut self,
1211 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1212 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1213 for header in range_headers {
1215 let (maybe_block, maybe_receipts) = self
1217 .filter_inner
1218 .eth_cache()
1219 .maybe_cached_block_and_receipts(header.hash())
1220 .await?;
1221
1222 let receipts = match maybe_receipts {
1223 Some(receipts) => receipts,
1224 None => {
1225 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1227 Some(receipts) => Arc::new(receipts),
1228 None => continue, }
1230 }
1231 };
1232
1233 if !receipts.is_empty() {
1234 self.next.push_back(ReceiptBlockResult {
1235 receipts,
1236 recovered_block: maybe_block,
1237 header,
1238 });
1239 }
1240 }
1241
1242 Ok(self.next.pop_front())
1243 }
1244
1245 fn spawn_parallel_tasks(
1250 &mut self,
1251 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1252 ) {
1253 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1255 let header_chunks = range_headers
1256 .into_iter()
1257 .chunks(chunk_size)
1258 .into_iter()
1259 .map(|chunk| chunk.collect::<Vec<_>>())
1260 .collect::<Vec<_>>();
1261
1262 for chunk_headers in header_chunks {
1264 let filter_inner = self.filter_inner.clone();
1265 let chunk_task = Box::pin(async move {
1266 let chunk_task = tokio::task::spawn_blocking(move || {
1267 let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1268
1269 for header in chunk_headers {
1270 let receipts = match filter_inner
1273 .provider()
1274 .receipts_by_block(header.hash().into())?
1275 {
1276 Some(receipts) => Arc::new(receipts),
1277 None => continue, };
1279
1280 if !receipts.is_empty() {
1281 chunk_results.push(ReceiptBlockResult {
1282 receipts,
1283 recovered_block: None,
1284 header,
1285 });
1286 }
1287 }
1288
1289 Ok(chunk_results)
1290 });
1291
1292 match chunk_task.await {
1294 Ok(Ok(chunk_results)) => Ok(chunk_results),
1295 Ok(Err(e)) => Err(e),
1296 Err(join_err) => {
1297 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1298 Err(EthFilterError::InternalError)
1299 }
1300 }
1301 });
1302
1303 self.pending_tasks.push_back(chunk_task);
1304 }
1305 }
1306}
1307
1308#[cfg(test)]
1309mod tests {
1310 use super::*;
1311 use crate::{eth::EthApi, EthApiBuilder};
1312 use alloy_network::Ethereum;
1313 use alloy_primitives::FixedBytes;
1314 use rand::Rng;
1315 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1316 use reth_ethereum_primitives::TxType;
1317 use reth_evm_ethereum::EthEvmConfig;
1318 use reth_network_api::noop::NoopNetwork;
1319 use reth_provider::test_utils::MockEthProvider;
1320 use reth_rpc_convert::RpcConverter;
1321 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1322 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1323 use reth_tasks::TokioTaskExecutor;
1324 use reth_testing_utils::generators;
1325 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1326 use std::{collections::VecDeque, sync::Arc};
1327
1328 #[test]
1329 fn test_block_range_iter() {
1330 let mut rng = generators::rng();
1331
1332 let start = rng.random::<u32>() as u64;
1333 let end = start.saturating_add(rng.random::<u32>() as u64);
1334 let step = rng.random::<u16>() as u64;
1335 let range = start..=end;
1336 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1337 let (from, mut end) = iter.next().unwrap();
1338 assert_eq!(from, start);
1339 assert_eq!(end, (from + step).min(*range.end()));
1340
1341 for (next_from, next_end) in iter {
1342 assert_eq!(next_from, end + 1);
1344 end = next_end;
1345 }
1346
1347 assert_eq!(end, *range.end());
1348 }
1349
1350 #[expect(clippy::type_complexity)]
1352 fn build_test_eth_api(
1353 provider: MockEthProvider,
1354 ) -> EthApi<
1355 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1356 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1357 > {
1358 EthApiBuilder::new(
1359 provider.clone(),
1360 testing_pool(),
1361 NoopNetwork::default(),
1362 EthEvmConfig::new(provider.chain_spec()),
1363 )
1364 .build()
1365 }
1366
1367 #[tokio::test]
1368 async fn test_range_block_mode_empty_range() {
1369 let provider = MockEthProvider::default();
1370 let eth_api = build_test_eth_api(provider);
1371
1372 let eth_filter = super::EthFilter::new(
1373 eth_api,
1374 EthFilterConfig::default(),
1375 Box::new(TokioTaskExecutor::default()),
1376 );
1377 let filter_inner = eth_filter.inner;
1378
1379 let headers = vec![];
1380 let max_range = 100;
1381
1382 let mut range_mode = RangeBlockMode {
1383 filter_inner,
1384 iter: headers.into_iter().peekable(),
1385 next: VecDeque::new(),
1386 max_range,
1387 pending_tasks: FuturesOrdered::new(),
1388 };
1389
1390 let result = range_mode.next().await;
1391 assert!(result.is_ok());
1392 assert!(result.unwrap().is_none());
1393 }
1394
1395 #[tokio::test]
1396 async fn test_range_block_mode_queued_results_priority() {
1397 let provider = MockEthProvider::default();
1398 let eth_api = build_test_eth_api(provider);
1399
1400 let eth_filter = super::EthFilter::new(
1401 eth_api,
1402 EthFilterConfig::default(),
1403 Box::new(TokioTaskExecutor::default()),
1404 );
1405 let filter_inner = eth_filter.inner;
1406
1407 let headers = vec![
1408 SealedHeader::new(
1409 alloy_consensus::Header { number: 100, ..Default::default() },
1410 FixedBytes::random(),
1411 ),
1412 SealedHeader::new(
1413 alloy_consensus::Header { number: 101, ..Default::default() },
1414 FixedBytes::random(),
1415 ),
1416 ];
1417
1418 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1420 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1421
1422 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1424 tx_type: TxType::Legacy,
1425 cumulative_gas_used: 100_000,
1426 logs: vec![],
1427 success: true,
1428 };
1429 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1430 tx_type: TxType::Eip1559,
1431 cumulative_gas_used: 200_000,
1432 logs: vec![],
1433 success: true,
1434 };
1435 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1436 tx_type: TxType::Eip2930,
1437 cumulative_gas_used: 150_000,
1438 logs: vec![],
1439 success: false, };
1441
1442 let mock_result_1 = ReceiptBlockResult {
1443 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1444 recovered_block: None,
1445 header: SealedHeader::new(
1446 alloy_consensus::Header { number: 42, ..Default::default() },
1447 expected_block_hash_1,
1448 ),
1449 };
1450
1451 let mock_result_2 = ReceiptBlockResult {
1452 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1453 recovered_block: None,
1454 header: SealedHeader::new(
1455 alloy_consensus::Header { number: 43, ..Default::default() },
1456 expected_block_hash_2,
1457 ),
1458 };
1459
1460 let mut range_mode = RangeBlockMode {
1461 filter_inner,
1462 iter: headers.into_iter().peekable(),
1463 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1465 pending_tasks: FuturesOrdered::new(),
1466 };
1467
1468 let result1 = range_mode.next().await;
1470 assert!(result1.is_ok());
1471 let receipt_result1 = result1.unwrap().unwrap();
1472 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1473 assert_eq!(receipt_result1.header.number, 42);
1474
1475 assert_eq!(receipt_result1.receipts.len(), 2);
1477 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1478 assert_eq!(
1479 receipt_result1.receipts[0].cumulative_gas_used,
1480 mock_receipt_1.cumulative_gas_used
1481 );
1482 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1483 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1484 assert_eq!(
1485 receipt_result1.receipts[1].cumulative_gas_used,
1486 mock_receipt_2.cumulative_gas_used
1487 );
1488 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1489
1490 let result2 = range_mode.next().await;
1492 assert!(result2.is_ok());
1493 let receipt_result2 = result2.unwrap().unwrap();
1494 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1495 assert_eq!(receipt_result2.header.number, 43);
1496
1497 assert_eq!(receipt_result2.receipts.len(), 1);
1499 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1500 assert_eq!(
1501 receipt_result2.receipts[0].cumulative_gas_used,
1502 mock_receipt_3.cumulative_gas_used
1503 );
1504 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1505
1506 assert!(range_mode.next.is_empty());
1508
1509 let result3 = range_mode.next().await;
1510 assert!(result3.is_ok());
1511 }
1512
1513 #[tokio::test]
1514 async fn test_range_block_mode_single_block_no_receipts() {
1515 let provider = MockEthProvider::default();
1516 let eth_api = build_test_eth_api(provider);
1517
1518 let eth_filter = super::EthFilter::new(
1519 eth_api,
1520 EthFilterConfig::default(),
1521 Box::new(TokioTaskExecutor::default()),
1522 );
1523 let filter_inner = eth_filter.inner;
1524
1525 let headers = vec![SealedHeader::new(
1526 alloy_consensus::Header { number: 100, ..Default::default() },
1527 FixedBytes::random(),
1528 )];
1529
1530 let mut range_mode = RangeBlockMode {
1531 filter_inner,
1532 iter: headers.into_iter().peekable(),
1533 next: VecDeque::new(),
1534 max_range: 100,
1535 pending_tasks: FuturesOrdered::new(),
1536 };
1537
1538 let result = range_mode.next().await;
1539 assert!(result.is_ok());
1540 }
1541
1542 #[tokio::test]
1543 async fn test_range_block_mode_provider_receipts() {
1544 let provider = MockEthProvider::default();
1545
1546 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1547 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1548 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1549
1550 let block_hash_1 = FixedBytes::random();
1551 let block_hash_2 = FixedBytes::random();
1552 let block_hash_3 = FixedBytes::random();
1553
1554 provider.add_header(block_hash_1, header_1.clone());
1555 provider.add_header(block_hash_2, header_2.clone());
1556 provider.add_header(block_hash_3, header_3.clone());
1557
1558 let mock_log = alloy_primitives::Log {
1560 address: alloy_primitives::Address::ZERO,
1561 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1562 };
1563
1564 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1565 tx_type: TxType::Legacy,
1566 cumulative_gas_used: 21_000,
1567 logs: vec![mock_log.clone()],
1568 success: true,
1569 };
1570 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1571 tx_type: TxType::Eip1559,
1572 cumulative_gas_used: 42_000,
1573 logs: vec![mock_log.clone()],
1574 success: true,
1575 };
1576 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1577 tx_type: TxType::Eip2930,
1578 cumulative_gas_used: 30_000,
1579 logs: vec![mock_log.clone()],
1580 success: false,
1581 };
1582
1583 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1584 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1585
1586 let eth_api = build_test_eth_api(provider);
1587
1588 let eth_filter = super::EthFilter::new(
1589 eth_api,
1590 EthFilterConfig::default(),
1591 Box::new(TokioTaskExecutor::default()),
1592 );
1593 let filter_inner = eth_filter.inner;
1594
1595 let headers = vec![
1596 SealedHeader::new(header_1, block_hash_1),
1597 SealedHeader::new(header_2, block_hash_2),
1598 SealedHeader::new(header_3, block_hash_3),
1599 ];
1600
1601 let mut range_mode = RangeBlockMode {
1602 filter_inner,
1603 iter: headers.into_iter().peekable(),
1604 next: VecDeque::new(),
1605 max_range: 3, pending_tasks: FuturesOrdered::new(),
1607 };
1608
1609 let result = range_mode.next().await;
1611 assert!(result.is_ok());
1612 let receipt_result = result.unwrap().unwrap();
1613
1614 assert_eq!(receipt_result.header.hash(), block_hash_1);
1615 assert_eq!(receipt_result.header.number, 100);
1616 assert_eq!(receipt_result.receipts.len(), 2);
1617
1618 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1620 assert_eq!(
1621 receipt_result.receipts[0].cumulative_gas_used,
1622 receipt_100_1.cumulative_gas_used
1623 );
1624 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1625
1626 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1627 assert_eq!(
1628 receipt_result.receipts[1].cumulative_gas_used,
1629 receipt_100_2.cumulative_gas_used
1630 );
1631 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1632
1633 let result2 = range_mode.next().await;
1635 assert!(result2.is_ok());
1636 let receipt_result2 = result2.unwrap().unwrap();
1637
1638 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1639 assert_eq!(receipt_result2.header.number, 101);
1640 assert_eq!(receipt_result2.receipts.len(), 1);
1641
1642 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1644 assert_eq!(
1645 receipt_result2.receipts[0].cumulative_gas_used,
1646 receipt_101_1.cumulative_gas_used
1647 );
1648 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1649
1650 let result3 = range_mode.next().await;
1652 assert!(result3.is_ok());
1653 assert!(result3.unwrap().is_none());
1654 }
1655
1656 #[tokio::test]
1657 async fn test_range_block_mode_iterator_exhaustion() {
1658 let provider = MockEthProvider::default();
1659
1660 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1661 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1662
1663 let block_hash_100 = FixedBytes::random();
1664 let block_hash_101 = FixedBytes::random();
1665
1666 provider.add_header(block_hash_100, header_100.clone());
1668 provider.add_header(block_hash_101, header_101.clone());
1669
1670 let mock_receipt = reth_ethereum_primitives::Receipt {
1672 tx_type: TxType::Legacy,
1673 cumulative_gas_used: 21_000,
1674 logs: vec![],
1675 success: true,
1676 };
1677 provider.add_receipts(100, vec![mock_receipt.clone()]);
1678 provider.add_receipts(101, vec![mock_receipt.clone()]);
1679
1680 let eth_api = build_test_eth_api(provider);
1681
1682 let eth_filter = super::EthFilter::new(
1683 eth_api,
1684 EthFilterConfig::default(),
1685 Box::new(TokioTaskExecutor::default()),
1686 );
1687 let filter_inner = eth_filter.inner;
1688
1689 let headers = vec![
1690 SealedHeader::new(header_100, block_hash_100),
1691 SealedHeader::new(header_101, block_hash_101),
1692 ];
1693
1694 let mut range_mode = RangeBlockMode {
1695 filter_inner,
1696 iter: headers.into_iter().peekable(),
1697 next: VecDeque::new(),
1698 max_range: 1,
1699 pending_tasks: FuturesOrdered::new(),
1700 };
1701
1702 let result1 = range_mode.next().await;
1703 assert!(result1.is_ok());
1704 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1709 assert!(result2.is_ok());
1710 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1714
1715 let result3 = range_mode.next().await;
1717 assert!(result3.is_ok());
1718 assert!(result3.unwrap().is_none());
1719 }
1720
1721 #[tokio::test]
1722 async fn test_cached_mode_with_mock_receipts() {
1723 let test_hash = FixedBytes::from([42u8; 32]);
1725 let test_block_number = 100u64;
1726 let test_header = SealedHeader::new(
1727 alloy_consensus::Header {
1728 number: test_block_number,
1729 gas_used: 50_000,
1730 ..Default::default()
1731 },
1732 test_hash,
1733 );
1734
1735 let mock_log = alloy_primitives::Log {
1737 address: alloy_primitives::Address::ZERO,
1738 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1739 };
1740
1741 let mock_receipt = reth_ethereum_primitives::Receipt {
1742 tx_type: TxType::Legacy,
1743 cumulative_gas_used: 21_000,
1744 logs: vec![mock_log],
1745 success: true,
1746 };
1747
1748 let provider = MockEthProvider::default();
1749 provider.add_header(test_hash, test_header.header().clone());
1750 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1751
1752 let eth_api = build_test_eth_api(provider);
1753 let eth_filter = super::EthFilter::new(
1754 eth_api,
1755 EthFilterConfig::default(),
1756 Box::new(TokioTaskExecutor::default()),
1757 );
1758 let filter_inner = eth_filter.inner;
1759
1760 let headers = vec![test_header.clone()];
1761
1762 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1763
1764 let result = cached_mode.next().await.expect("next should succeed");
1766 let receipt_block_result = result.expect("should have receipt result");
1767 assert_eq!(receipt_block_result.header.hash(), test_hash);
1768 assert_eq!(receipt_block_result.header.number, test_block_number);
1769 assert_eq!(receipt_block_result.receipts.len(), 1);
1770 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1771 assert_eq!(
1772 receipt_block_result.receipts[0].cumulative_gas_used,
1773 mock_receipt.cumulative_gas_used
1774 );
1775 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1776
1777 let result2 = cached_mode.next().await;
1779 assert!(result2.is_ok());
1780 assert!(result2.unwrap().is_none());
1781 }
1782
1783 #[tokio::test]
1784 async fn test_cached_mode_empty_headers() {
1785 let provider = MockEthProvider::default();
1786 let eth_api = build_test_eth_api(provider);
1787
1788 let eth_filter = super::EthFilter::new(
1789 eth_api,
1790 EthFilterConfig::default(),
1791 Box::new(TokioTaskExecutor::default()),
1792 );
1793 let filter_inner = eth_filter.inner;
1794
1795 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1796
1797 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1798
1799 let result = cached_mode.next().await.expect("next should succeed");
1801 assert!(result.is_none());
1802 }
1803
1804 #[tokio::test]
1805 async fn test_non_consecutive_headers_after_bloom_filter() {
1806 let provider = MockEthProvider::default();
1807
1808 let mut expected_hashes = vec![];
1810 let mut prev_hash = alloy_primitives::B256::default();
1811
1812 use alloy_consensus::TxLegacy;
1814 use reth_ethereum_primitives::{TransactionSigned, TxType};
1815
1816 let tx_inner = TxLegacy {
1817 chain_id: Some(1),
1818 nonce: 0,
1819 gas_price: 21_000,
1820 gas_limit: 21_000,
1821 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1822 value: alloy_primitives::U256::ZERO,
1823 input: alloy_primitives::Bytes::new(),
1824 };
1825 let signature = alloy_primitives::Signature::test_signature();
1826 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1827
1828 for i in 100u64..=103 {
1829 let header = alloy_consensus::Header {
1830 number: i,
1831 parent_hash: prev_hash,
1832 logs_bloom: if i == 100 || i == 102 {
1834 alloy_primitives::Bloom::from([1u8; 256])
1835 } else {
1836 alloy_primitives::Bloom::default()
1837 },
1838 ..Default::default()
1839 };
1840
1841 let hash = header.hash_slow();
1842 expected_hashes.push(hash);
1843 prev_hash = hash;
1844
1845 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1847
1848 let block = reth_ethereum_primitives::Block {
1849 header,
1850 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1851 };
1852 provider.add_block(hash, block);
1853 }
1854
1855 let mock_log = alloy_primitives::Log {
1857 address: alloy_primitives::Address::ZERO,
1858 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1859 };
1860
1861 let receipt = reth_ethereum_primitives::Receipt {
1862 tx_type: TxType::Legacy,
1863 cumulative_gas_used: 21_000,
1864 logs: vec![mock_log],
1865 success: true,
1866 };
1867
1868 provider.add_receipts(100, vec![receipt.clone()]);
1869 provider.add_receipts(101, vec![]);
1870 provider.add_receipts(102, vec![receipt.clone()]);
1871 provider.add_receipts(103, vec![]);
1872
1873 use reth_db_api::models::StoredBlockBodyIndices;
1875 provider
1876 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1877 provider
1878 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1879 provider
1880 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1881 provider
1882 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1883
1884 let eth_api = build_test_eth_api(provider);
1885 let eth_filter = EthFilter::new(
1886 eth_api,
1887 EthFilterConfig::default(),
1888 Box::new(TokioTaskExecutor::default()),
1889 );
1890
1891 let filter = Filter::default();
1893
1894 let logs = eth_filter
1896 .inner
1897 .clone()
1898 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1899 .await
1900 .expect("should succeed");
1901
1902 assert_eq!(logs.len(), 2);
1904
1905 assert_eq!(logs[0].block_number, Some(100));
1906 assert_eq!(logs[1].block_number, Some(102));
1907
1908 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1912}