1use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8 PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12 future::TryFutureExt,
13 stream::{FuturesOrdered, StreamExt},
14 Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21 helpers::{EthBlocks, LoadReceipt},
22 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23 RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32 ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::Runtime;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37 collections::{HashMap, VecDeque},
38 fmt,
39 iter::{Peekable, StepBy},
40 ops::RangeInclusive,
41 pin::Pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 sync::{mpsc::Receiver, oneshot, Mutex},
47 time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53 Eth: FullEthApiTypes
54 + RpcNodeCoreExt<Provider: BlockIdReader>
55 + LoadReceipt
56 + EthBlocks
57 + 'static,
58{
59 fn logs(
61 &self,
62 filter: Filter,
63 limits: QueryLimits,
64 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65 trace!(target: "rpc::eth", "Serving eth_getLogs");
66 self.logs_for_filter(filter, limits).map_err(|e| e.into())
67 }
68}
69
70const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91pub struct EthFilter<Eth: EthApiTypes> {
95 inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101 Eth: EthApiTypes,
102{
103 fn clone(&self) -> Self {
104 Self { inner: self.inner.clone() }
105 }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110 Eth: EthApiTypes + 'static,
111{
112 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self {
140 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141 config;
142 let inner = EthFilterInner {
143 eth_api,
144 active_filters: ActiveFilters::new(),
145 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146 max_headers_range: MAX_HEADERS_RANGE,
147 task_spawner,
148 stale_filter_ttl,
149 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150 };
151
152 let eth_filter = Self { inner: Arc::new(inner) };
153
154 let this = eth_filter.clone();
155 eth_filter.inner.task_spawner.spawn_critical_task(
156 "eth-filters_stale-filters-clean",
157 async move {
158 this.watch_and_clear_stale_filters().await;
159 },
160 );
161
162 eth_filter
163 }
164
165 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167 &self.inner.active_filters
168 }
169
170 async fn watch_and_clear_stale_filters(&self) {
173 let mut interval = tokio::time::interval_at(
174 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175 self.inner.stale_filter_ttl,
176 );
177 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178 loop {
179 interval.tick().await;
180 self.clear_stale_filters(Instant::now()).await;
181 }
182 }
183
184 pub async fn clear_stale_filters(&self, now: Instant) {
187 trace!(target: "rpc::eth", "clear stale filters");
188 let mut filters = self.active_filters().inner.lock().await;
189 filters.retain(|id, filter| {
190 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192 if !is_valid {
193 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194 }
195
196 is_valid
197 });
198 filters.shrink_to_fit();
199 }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205 + RpcNodeCoreExt
206 + LoadReceipt
207 + EthBlocks
208 + 'static,
209{
210 fn provider(&self) -> &Eth::Provider {
212 self.inner.eth_api.provider()
213 }
214
215 fn pool(&self) -> &Eth::Pool {
217 self.inner.eth_api.pool()
218 }
219
220 pub async fn filter_changes(
222 &self,
223 id: FilterId,
224 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225 let info = self.provider().chain_info()?;
226 let best_number = info.best_number;
227
228 let (start_block, kind) = {
231 let mut filters = self.inner.active_filters.inner.lock().await;
232 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234 if filter.block > best_number {
235 return Ok(FilterChanges::Empty)
237 }
238
239 let mut block = best_number + 1;
243 std::mem::swap(&mut filter.block, &mut block);
244 filter.last_poll_timestamp = Instant::now();
245
246 (block, filter.kind.clone())
247 };
248
249 match kind {
250 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251 FilterKind::Block => {
252 let end_block = best_number + 1;
255 let block_hashes =
256 self.provider().canonical_hashes_range(start_block, end_block).map_err(
257 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258 )?;
259 Ok(FilterChanges::Hashes(block_hashes))
260 }
261 FilterKind::Log(filter) => {
262 let (from_block_number, to_block_number) = match filter.block_option {
263 FilterBlockOption::Range { from_block, to_block } => {
264 let from = from_block
265 .map(|num| self.provider().convert_block_number(num))
266 .transpose()?
267 .flatten();
268 let to = to_block
269 .map(|num| self.provider().convert_block_number(num))
270 .transpose()?
271 .flatten();
272 logs_utils::get_filter_block_range(from, to, start_block, info)?
273 }
274 FilterBlockOption::AtBlockHash(block_hash) => {
275 let block_number = self
279 .provider()
280 .block_number(block_hash)?
281 .ok_or(ProviderError::HeaderNotFound(block_hash.into()))?;
282 (block_number, block_number)
283 }
284 };
285 let logs = self
286 .inner
287 .clone()
288 .get_logs_in_block_range(
289 *filter,
290 from_block_number,
291 to_block_number,
292 self.inner.query_limits,
293 )
294 .await?;
295 Ok(FilterChanges::Logs(logs))
296 }
297 }
298 }
299
300 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
306 let filter = {
307 let mut filters = self.inner.active_filters.inner.lock().await;
308 let filter =
309 filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
310 if let FilterKind::Log(ref inner_filter) = filter.kind {
311 filter.last_poll_timestamp = Instant::now();
312 *inner_filter.clone()
313 } else {
314 return Err(EthFilterError::FilterNotFound(id))
316 }
317 };
318
319 self.logs_for_filter(filter, self.inner.query_limits).await
320 }
321
322 async fn logs_for_filter(
324 &self,
325 filter: Filter,
326 limits: QueryLimits,
327 ) -> Result<Vec<Log>, EthFilterError> {
328 self.inner.clone().logs_for_filter(filter, limits).await
329 }
330}
331
332#[async_trait]
333impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
334where
335 Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
336{
337 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
339 trace!(target: "rpc::eth", "Serving eth_newFilter");
340 self.inner
341 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
342 .await
343 }
344
345 async fn new_block_filter(&self) -> RpcResult<FilterId> {
347 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
348 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
349 }
350
351 async fn new_pending_transaction_filter(
353 &self,
354 kind: Option<PendingTransactionFilterKind>,
355 ) -> RpcResult<FilterId> {
356 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
357
358 let transaction_kind = match kind.unwrap_or_default() {
359 PendingTransactionFilterKind::Hashes => {
360 let receiver = self.pool().pending_transactions_listener();
361 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
362 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
363 }
364 PendingTransactionFilterKind::Full => {
365 let stream = self.pool().new_pending_pool_transactions_listener();
366 let full_txs_receiver = FullTransactionsReceiver::new(
367 stream,
368 dyn_clone::clone(self.inner.eth_api.converter()),
369 );
370 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
371 full_txs_receiver,
372 )))
373 }
374 };
375
376 self.inner.install_filter(transaction_kind).await
378 }
379
380 async fn filter_changes(
382 &self,
383 id: FilterId,
384 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
385 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
386 Ok(Self::filter_changes(self, id).await?)
387 }
388
389 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
395 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
396 Ok(Self::filter_logs(self, id).await?)
397 }
398
399 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
401 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
402 let mut filters = self.inner.active_filters.inner.lock().await;
403 if filters.remove(&id).is_some() {
404 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
405 Ok(true)
406 } else {
407 Ok(false)
408 }
409 }
410
411 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
415 trace!(target: "rpc::eth", "Serving eth_getLogs");
416 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
417 }
418}
419
420impl<Eth> std::fmt::Debug for EthFilter<Eth>
421where
422 Eth: EthApiTypes,
423{
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 f.debug_struct("EthFilter").finish_non_exhaustive()
426 }
427}
428
429#[derive(Debug)]
431struct EthFilterInner<Eth: EthApiTypes> {
432 eth_api: Eth,
434 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
436 id_provider: Arc<dyn IdProvider>,
438 query_limits: QueryLimits,
440 max_headers_range: u64,
442 task_spawner: Runtime,
444 stale_filter_ttl: Duration,
446}
447
448impl<Eth> EthFilterInner<Eth>
449where
450 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
451 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
452 + LoadReceipt
453 + EthBlocks
454 + 'static,
455{
456 fn provider(&self) -> &Eth::Provider {
458 self.eth_api.provider()
459 }
460
461 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
463 self.eth_api.cache()
464 }
465
466 async fn logs_for_filter(
468 self: Arc<Self>,
469 filter: Filter,
470 limits: QueryLimits,
471 ) -> Result<Vec<Log>, EthFilterError> {
472 match filter.block_option {
473 FilterBlockOption::AtBlockHash(block_hash) => {
474 let Some((receipts, maybe_block)) =
476 self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
477 else {
478 return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
479 };
480
481 let header = if let Some(block) = &maybe_block {
483 block.header().clone()
484 } else {
485 self.provider()
486 .header_by_hash_or_number(block_hash.into())?
487 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?
488 };
489
490 let earliest_block = self.provider().earliest_block_number()?;
492 if header.number() < earliest_block {
493 return Err(EthApiError::PrunedHistoryUnavailable.into());
494 }
495
496 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
497
498 let mut all_logs = Vec::new();
499 append_matching_block_logs(
500 &mut all_logs,
501 maybe_block
502 .map(ProviderOrBlock::Block)
503 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
504 &filter,
505 block_num_hash,
506 &receipts,
507 false,
508 header.timestamp(),
509 )?;
510
511 Ok(all_logs)
512 }
513 FilterBlockOption::Range { from_block, to_block } => {
514 if from_block.is_some_and(|b| b.is_pending()) {
516 let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
517 if !(to_block.is_pending() || to_block.is_number()) {
518 return Ok(Vec::new());
520 }
521 if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
523 if let BlockNumberOrTag::Number(to_block) = to_block &&
524 to_block < pending_block.block.number()
525 {
526 return Ok(Vec::new());
528 }
529
530 let info = self.provider().chain_info()?;
531 if pending_block.block.number() > info.best_number {
532 let mut all_logs = Vec::new();
534 let timestamp = pending_block.block.timestamp();
535 let block_num_hash = pending_block.block.num_hash();
536 append_matching_block_logs(
537 &mut all_logs,
538 ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
539 &filter,
540 block_num_hash,
541 &pending_block.receipts,
542 false, timestamp,
544 )?;
545 return Ok(all_logs);
546 }
547 }
548 }
549
550 let info = self.provider().chain_info()?;
551 let start_block = info.best_number;
552 let from = from_block
553 .map(|num| self.provider().convert_block_number(num))
554 .transpose()?
555 .flatten();
556 let to = to_block
557 .map(|num| self.provider().convert_block_number(num))
558 .transpose()?
559 .flatten();
560
561 if let Some(t) = to &&
563 t > info.best_number
564 {
565 return Err(EthFilterError::BlockRangeExceedsHead);
566 }
567
568 if let Some(f) = from &&
569 f > info.best_number
570 {
571 return Ok(Vec::new());
573 }
574
575 let (from_block_number, to_block_number) =
576 logs_utils::get_filter_block_range(from, to, start_block, info)?;
577
578 let earliest_block = self.provider().earliest_block_number()?;
580 if from_block_number < earliest_block {
581 return Err(EthApiError::PrunedHistoryUnavailable.into());
582 }
583
584 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
585 .await
586 }
587 }
588 }
589
590 async fn install_filter(
592 &self,
593 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
594 ) -> RpcResult<FilterId> {
595 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
596 let subscription_id = self.id_provider.next_id();
597
598 let id = match subscription_id {
599 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
600 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
601 };
602 let mut filters = self.active_filters.inner.lock().await;
603 filters.insert(
604 id.clone(),
605 ActiveFilter {
606 block: last_poll_block_number,
607 last_poll_timestamp: Instant::now(),
608 kind,
609 },
610 );
611 Ok(id)
612 }
613
614 async fn get_logs_in_block_range(
620 self: Arc<Self>,
621 filter: Filter,
622 from_block: u64,
623 to_block: u64,
624 limits: QueryLimits,
625 ) -> Result<Vec<Log>, EthFilterError> {
626 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
627
628 if to_block < from_block {
630 return Err(EthFilterError::InvalidBlockRangeParams)
631 }
632
633 if let Some(max_blocks_per_filter) =
634 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
635 {
636 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
637 }
638
639 let (tx, rx) = oneshot::channel();
640 let this = self.clone();
641 self.task_spawner.spawn_blocking_task(async move {
642 let res =
643 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
644 let _ = tx.send(res);
645 });
646
647 rx.await.map_err(|_| EthFilterError::InternalError)?
648 }
649
650 async fn get_logs_in_block_range_inner(
659 self: Arc<Self>,
660 filter: &Filter,
661 from_block: u64,
662 to_block: u64,
663 limits: QueryLimits,
664 ) -> Result<Vec<Log>, EthFilterError> {
665 let mut all_logs = Vec::new();
666 let mut matching_headers = Vec::new();
667
668 let chain_tip = self.provider().best_block_number()?;
670
671 for (from, to) in
673 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
674 {
675 let headers = self.provider().headers_range(from..=to)?;
676
677 let mut headers_iter = headers.into_iter().peekable();
678
679 while let Some(header) = headers_iter.next() {
680 if !filter.matches_bloom(header.logs_bloom()) {
681 continue
682 }
683
684 let current_number = header.number();
685
686 let block_hash = match headers_iter.peek() {
687 Some(next_header) if next_header.number() == current_number + 1 => {
688 next_header.parent_hash()
690 }
691 _ => {
692 header.hash_slow()
694 }
695 };
696
697 matching_headers.push(SealedHeader::new(header, block_hash));
698 }
699 }
700
701 let mut range_mode = RangeMode::new(
703 self.clone(),
704 matching_headers,
705 from_block,
706 to_block,
707 self.max_headers_range,
708 chain_tip,
709 );
710
711 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
713 range_mode.next().await?
714 {
715 let num_hash = header.num_hash();
716 append_matching_block_logs(
717 &mut all_logs,
718 recovered_block
719 .map(ProviderOrBlock::Block)
720 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
721 filter,
722 num_hash,
723 &receipts,
724 false,
725 header.timestamp(),
726 )?;
727
728 let is_multi_block_range = from_block != to_block;
731 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
732 is_multi_block_range &&
733 all_logs.len() > max_logs_per_response
734 {
735 debug!(
736 target: "rpc::eth::filter",
737 logs_found = all_logs.len(),
738 max_logs_per_response,
739 from_block,
740 to_block = num_hash.number,
741 "Query exceeded max logs per response limit"
742 );
743 return Err(EthFilterError::QueryExceedsMaxResults {
744 max_logs: max_logs_per_response,
745 from_block,
746 to_block: num_hash.number,
747 });
748 }
749 }
750
751 Ok(all_logs)
752 }
753}
754
755#[derive(Debug, Clone, Default)]
757pub struct ActiveFilters<T> {
758 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
759}
760
761impl<T> ActiveFilters<T> {
762 pub fn new() -> Self {
764 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
765 }
766
767 pub async fn contains(&self, id: &FilterId) -> bool {
769 self.inner.lock().await.contains_key(id)
770 }
771
772 pub async fn len(&self) -> usize {
774 self.inner.lock().await.len()
775 }
776
777 pub async fn is_empty(&self) -> bool {
779 self.inner.lock().await.is_empty()
780 }
781
782 pub async fn ids(&self) -> Vec<FilterId> {
784 self.inner.lock().await.keys().cloned().collect()
785 }
786}
787
788#[derive(Debug)]
790struct ActiveFilter<T> {
791 block: u64,
793 last_poll_timestamp: Instant,
795 kind: FilterKind<T>,
797}
798
799#[derive(Debug, Clone)]
801struct PendingTransactionsReceiver {
802 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
803}
804
805impl PendingTransactionsReceiver {
806 fn new(receiver: Receiver<TxHash>) -> Self {
807 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
808 }
809
810 async fn drain<T>(&self) -> FilterChanges<T> {
812 let mut pending_txs = Vec::new();
813 let mut prepared_stream = self.txs_receiver.lock().await;
814
815 while let Ok(tx_hash) = prepared_stream.try_recv() {
816 pending_txs.push(tx_hash);
817 }
818
819 FilterChanges::Hashes(pending_txs)
821 }
822}
823
824#[derive(Debug, Clone)]
826struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
827 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
828 converter: TxCompat,
829}
830
831impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
832where
833 T: PoolTransaction + 'static,
834 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
835{
836 fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
838 Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
839 }
840
841 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
843 let mut pending_txs = Vec::new();
844 let mut prepared_stream = self.txs_stream.lock().await;
845
846 while let Ok(tx) = prepared_stream.try_recv() {
847 match self.converter.fill_pending(tx.transaction.to_consensus()) {
848 Ok(tx) => pending_txs.push(tx),
849 Err(err) => {
850 error!(target: "rpc",
851 %err,
852 "Failed to fill txn with block context"
853 );
854 }
855 }
856 }
857 FilterChanges::Transactions(pending_txs)
858 }
859}
860
861#[async_trait]
863trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
864 async fn drain(&self) -> FilterChanges<T>;
865}
866
867#[async_trait]
868impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
869 for FullTransactionsReceiver<T, TxCompat>
870where
871 T: PoolTransaction + 'static,
872 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
873{
874 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
875 Self::drain(self).await
876 }
877}
878
879#[derive(Debug, Clone)]
885enum PendingTransactionKind<T> {
886 Hashes(PendingTransactionsReceiver),
887 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
888}
889
890impl<T: 'static> PendingTransactionKind<T> {
891 async fn drain(&self) -> FilterChanges<T> {
892 match self {
893 Self::Hashes(receiver) => receiver.drain().await,
894 Self::FullTransaction(receiver) => receiver.drain().await,
895 }
896 }
897}
898
899#[derive(Clone, Debug)]
900enum FilterKind<T> {
901 Log(Box<Filter>),
902 Block,
903 PendingTransaction(PendingTransactionKind<T>),
904}
905
906#[derive(Debug)]
908struct BlockRangeInclusiveIter {
909 iter: StepBy<RangeInclusive<u64>>,
910 step: u64,
911 end: u64,
912}
913
914impl BlockRangeInclusiveIter {
915 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
916 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
917 }
918}
919
920impl Iterator for BlockRangeInclusiveIter {
921 type Item = (u64, u64);
922
923 fn next(&mut self) -> Option<Self::Item> {
924 let start = self.iter.next()?;
925 let end = (start + self.step).min(self.end);
926 if start > end {
927 return None
928 }
929 Some((start, end))
930 }
931}
932
933#[derive(Debug, thiserror::Error)]
935pub enum EthFilterError {
936 #[error("filter not found")]
938 FilterNotFound(FilterId),
939 #[error("invalid block range params")]
941 InvalidBlockRangeParams,
942 #[error("block range extends beyond current head block")]
944 BlockRangeExceedsHead,
945 #[error("query exceeds max block range {0}")]
947 QueryExceedsMaxBlocks(u64),
948 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
950 QueryExceedsMaxResults {
951 max_logs: usize,
953 from_block: u64,
955 to_block: u64,
957 },
958 #[error(transparent)]
960 EthAPIError(#[from] EthApiError),
961 #[error("internal filter error")]
963 InternalError,
964}
965
966impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
967 fn from(err: EthFilterError) -> Self {
968 match err {
969 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
970 jsonrpsee::types::error::INVALID_PARAMS_CODE,
971 "filter not found",
972 ),
973 err @ EthFilterError::InternalError => {
974 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
975 }
976 EthFilterError::EthAPIError(err) => err.into(),
977 err @ (EthFilterError::InvalidBlockRangeParams |
978 EthFilterError::QueryExceedsMaxBlocks(_) |
979 EthFilterError::QueryExceedsMaxResults { .. } |
980 EthFilterError::BlockRangeExceedsHead) => {
981 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
982 }
983 }
984 }
985}
986
987impl From<ProviderError> for EthFilterError {
988 fn from(err: ProviderError) -> Self {
989 Self::EthAPIError(err.into())
990 }
991}
992
993impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
994 fn from(err: logs_utils::FilterBlockRangeError) -> Self {
995 match err {
996 logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
997 logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
998 }
999 }
1000}
1001
1002struct ReceiptBlockResult<P>
1005where
1006 P: ReceiptProvider + BlockReader,
1007{
1008 receipts: Arc<Vec<ProviderReceipt<P>>>,
1010 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
1012 header: SealedHeader<<P as HeaderProvider>::Header>,
1014}
1015
1016enum RangeMode<
1018 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1019 + EthApiTypes
1020 + LoadReceipt
1021 + EthBlocks
1022 + 'static,
1023> {
1024 Cached(CachedMode<Eth>),
1026 Range(RangeBlockMode<Eth>),
1028}
1029
1030impl<
1031 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1032 + EthApiTypes
1033 + LoadReceipt
1034 + EthBlocks
1035 + 'static,
1036 > RangeMode<Eth>
1037{
1038 fn new(
1040 filter_inner: Arc<EthFilterInner<Eth>>,
1041 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1042 from_block: u64,
1043 to_block: u64,
1044 max_headers_range: u64,
1045 chain_tip: u64,
1046 ) -> Self {
1047 let block_count = to_block - from_block + 1;
1048 let distance_from_tip = chain_tip.saturating_sub(to_block);
1049
1050 let use_cached_mode =
1052 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1053
1054 if use_cached_mode && !sealed_headers.is_empty() {
1055 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1056 } else {
1057 Self::Range(RangeBlockMode {
1058 filter_inner,
1059 iter: sealed_headers.into_iter().peekable(),
1060 next: VecDeque::new(),
1061 max_range: max_headers_range as usize,
1062 pending_tasks: FuturesOrdered::new(),
1063 })
1064 }
1065 }
1066
1067 const fn should_use_cached_mode(
1069 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1070 block_count: u64,
1071 distance_from_tip: u64,
1072 ) -> bool {
1073 let bloom_matches = headers.len();
1075
1076 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1078
1079 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1080 }
1081
1082 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1084 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1086 return CACHED_MODE_BLOCK_THRESHOLD;
1087 }
1088
1089 match bloom_matches {
1090 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1091 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1092 _ => CACHED_MODE_BLOCK_THRESHOLD,
1093 }
1094 }
1095
1096 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1098 match self {
1099 Self::Cached(cached) => cached.next().await,
1100 Self::Range(range) => range.next().await,
1101 }
1102 }
1103}
1104
1105struct CachedMode<
1107 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1108 + EthApiTypes
1109 + LoadReceipt
1110 + EthBlocks
1111 + 'static,
1112> {
1113 filter_inner: Arc<EthFilterInner<Eth>>,
1114 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1115}
1116
1117impl<
1118 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1119 + EthApiTypes
1120 + LoadReceipt
1121 + EthBlocks
1122 + 'static,
1123 > CachedMode<Eth>
1124{
1125 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1126 for header in self.headers_iter.by_ref() {
1127 if let Some((receipts, maybe_block)) =
1129 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1130 {
1131 return Ok(Some(ReceiptBlockResult {
1132 receipts,
1133 recovered_block: maybe_block,
1134 header,
1135 }));
1136 }
1137 }
1138
1139 Ok(None) }
1141}
1142
1143type ReceiptFetchFuture<P> =
1145 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1146
1147struct RangeBlockMode<
1149 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1150 + EthApiTypes
1151 + LoadReceipt
1152 + EthBlocks
1153 + 'static,
1154> {
1155 filter_inner: Arc<EthFilterInner<Eth>>,
1156 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1157 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1158 max_range: usize,
1159 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1161}
1162
1163impl<
1164 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1165 + EthApiTypes
1166 + LoadReceipt
1167 + EthBlocks
1168 + 'static,
1169 > RangeBlockMode<Eth>
1170{
1171 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1172 loop {
1173 if let Some(result) = self.next.pop_front() {
1175 return Ok(Some(result));
1176 }
1177
1178 if let Some(task_result) = self.pending_tasks.next().await {
1180 self.next.extend(task_result?);
1181 continue;
1182 }
1183
1184 let Some(next_header) = self.iter.next() else {
1186 return Ok(None);
1188 };
1189
1190 let mut range_headers = Vec::with_capacity(self.max_range);
1191 range_headers.push(next_header);
1192
1193 while range_headers.len() < self.max_range {
1195 let Some(peeked) = self.iter.peek() else { break };
1196 let Some(last_header) = range_headers.last() else { break };
1197
1198 let expected_next = last_header.number() + 1;
1199 if peeked.number() != expected_next {
1200 trace!(
1201 target: "rpc::eth::filter",
1202 last_block = last_header.number(),
1203 next_block = peeked.number(),
1204 expected = expected_next,
1205 range_size = range_headers.len(),
1206 "Non-consecutive block detected, stopping range collection"
1207 );
1208 break; }
1210
1211 let Some(next_header) = self.iter.next() else { break };
1212 range_headers.push(next_header);
1213 }
1214
1215 let remaining_headers = self.iter.len() + range_headers.len();
1217 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1218 self.spawn_parallel_tasks(range_headers);
1219 } else {
1221 if let Some(result) = self.process_small_range(range_headers).await? {
1223 return Ok(Some(result));
1224 }
1225 }
1227 }
1228 }
1229
1230 async fn process_small_range(
1234 &mut self,
1235 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1236 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1237 for header in range_headers {
1239 let (maybe_block, maybe_receipts) = self
1241 .filter_inner
1242 .eth_cache()
1243 .maybe_cached_block_and_receipts(header.hash())
1244 .await?;
1245
1246 let receipts = match maybe_receipts {
1247 Some(receipts) => receipts,
1248 None => {
1249 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1251 Some(receipts) => Arc::new(receipts),
1252 None => continue, }
1254 }
1255 };
1256
1257 if !receipts.is_empty() {
1258 self.next.push_back(ReceiptBlockResult {
1259 receipts,
1260 recovered_block: maybe_block,
1261 header,
1262 });
1263 }
1264 }
1265
1266 Ok(self.next.pop_front())
1267 }
1268
1269 fn spawn_parallel_tasks(
1274 &mut self,
1275 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1276 ) {
1277 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1279 let header_chunks = range_headers
1280 .into_iter()
1281 .chunks(chunk_size)
1282 .into_iter()
1283 .map(|chunk| chunk.collect::<Vec<_>>())
1284 .collect::<Vec<_>>();
1285
1286 for chunk_headers in header_chunks {
1288 let filter_inner = self.filter_inner.clone();
1289 let chunk_task = Box::pin(async move {
1290 let chunk_task = tokio::task::spawn_blocking(move || {
1291 let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1292
1293 for header in chunk_headers {
1294 let receipts = match filter_inner
1297 .provider()
1298 .receipts_by_block(header.hash().into())?
1299 {
1300 Some(receipts) => Arc::new(receipts),
1301 None => continue, };
1303
1304 if !receipts.is_empty() {
1305 chunk_results.push(ReceiptBlockResult {
1306 receipts,
1307 recovered_block: None,
1308 header,
1309 });
1310 }
1311 }
1312
1313 Ok(chunk_results)
1314 });
1315
1316 match chunk_task.await {
1318 Ok(Ok(chunk_results)) => Ok(chunk_results),
1319 Ok(Err(e)) => Err(e),
1320 Err(join_err) => {
1321 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1322 Err(EthFilterError::InternalError)
1323 }
1324 }
1325 });
1326
1327 self.pending_tasks.push_back(chunk_task);
1328 }
1329 }
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 use super::*;
1335 use crate::{eth::EthApi, EthApiBuilder};
1336 use alloy_network::Ethereum;
1337 use alloy_primitives::FixedBytes;
1338 use rand::Rng;
1339 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1340 use reth_ethereum_primitives::TxType;
1341 use reth_evm_ethereum::EthEvmConfig;
1342 use reth_network_api::noop::NoopNetwork;
1343 use reth_provider::test_utils::MockEthProvider;
1344 use reth_rpc_convert::RpcConverter;
1345 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1346 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1347 use reth_tasks::Runtime;
1348 use reth_testing_utils::generators;
1349 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1350 use std::{collections::VecDeque, sync::Arc};
1351
1352 #[test]
1353 fn test_block_range_iter() {
1354 let mut rng = generators::rng();
1355
1356 let start = rng.random::<u32>() as u64;
1357 let end = start.saturating_add(rng.random::<u32>() as u64);
1358 let step = rng.random::<u16>() as u64;
1359 let range = start..=end;
1360 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1361 let (from, mut end) = iter.next().unwrap();
1362 assert_eq!(from, start);
1363 assert_eq!(end, (from + step).min(*range.end()));
1364
1365 for (next_from, next_end) in iter {
1366 assert_eq!(next_from, end + 1);
1368 end = next_end;
1369 }
1370
1371 assert_eq!(end, *range.end());
1372 }
1373
1374 #[expect(clippy::type_complexity)]
1376 fn build_test_eth_api(
1377 provider: MockEthProvider,
1378 ) -> EthApi<
1379 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1380 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1381 > {
1382 EthApiBuilder::new(
1383 provider.clone(),
1384 testing_pool(),
1385 NoopNetwork::default(),
1386 EthEvmConfig::new(provider.chain_spec()),
1387 )
1388 .build()
1389 }
1390
1391 #[tokio::test]
1392 async fn test_range_block_mode_empty_range() {
1393 let provider = MockEthProvider::default();
1394 let eth_api = build_test_eth_api(provider);
1395
1396 let eth_filter =
1397 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1398 let filter_inner = eth_filter.inner;
1399
1400 let headers = vec![];
1401 let max_range = 100;
1402
1403 let mut range_mode = RangeBlockMode {
1404 filter_inner,
1405 iter: headers.into_iter().peekable(),
1406 next: VecDeque::new(),
1407 max_range,
1408 pending_tasks: FuturesOrdered::new(),
1409 };
1410
1411 let result = range_mode.next().await;
1412 assert!(result.is_ok());
1413 assert!(result.unwrap().is_none());
1414 }
1415
1416 #[tokio::test]
1417 async fn test_range_block_mode_queued_results_priority() {
1418 let provider = MockEthProvider::default();
1419 let eth_api = build_test_eth_api(provider);
1420
1421 let eth_filter =
1422 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1423 let filter_inner = eth_filter.inner;
1424
1425 let headers = vec![
1426 SealedHeader::new(
1427 alloy_consensus::Header { number: 100, ..Default::default() },
1428 FixedBytes::random(),
1429 ),
1430 SealedHeader::new(
1431 alloy_consensus::Header { number: 101, ..Default::default() },
1432 FixedBytes::random(),
1433 ),
1434 ];
1435
1436 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1438 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1439
1440 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1442 tx_type: TxType::Legacy,
1443 cumulative_gas_used: 100_000,
1444 logs: vec![],
1445 success: true,
1446 };
1447 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1448 tx_type: TxType::Eip1559,
1449 cumulative_gas_used: 200_000,
1450 logs: vec![],
1451 success: true,
1452 };
1453 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1454 tx_type: TxType::Eip2930,
1455 cumulative_gas_used: 150_000,
1456 logs: vec![],
1457 success: false, };
1459
1460 let mock_result_1 = ReceiptBlockResult {
1461 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1462 recovered_block: None,
1463 header: SealedHeader::new(
1464 alloy_consensus::Header { number: 42, ..Default::default() },
1465 expected_block_hash_1,
1466 ),
1467 };
1468
1469 let mock_result_2 = ReceiptBlockResult {
1470 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1471 recovered_block: None,
1472 header: SealedHeader::new(
1473 alloy_consensus::Header { number: 43, ..Default::default() },
1474 expected_block_hash_2,
1475 ),
1476 };
1477
1478 let mut range_mode = RangeBlockMode {
1479 filter_inner,
1480 iter: headers.into_iter().peekable(),
1481 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1483 pending_tasks: FuturesOrdered::new(),
1484 };
1485
1486 let result1 = range_mode.next().await;
1488 assert!(result1.is_ok());
1489 let receipt_result1 = result1.unwrap().unwrap();
1490 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1491 assert_eq!(receipt_result1.header.number, 42);
1492
1493 assert_eq!(receipt_result1.receipts.len(), 2);
1495 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1496 assert_eq!(
1497 receipt_result1.receipts[0].cumulative_gas_used,
1498 mock_receipt_1.cumulative_gas_used
1499 );
1500 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1501 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1502 assert_eq!(
1503 receipt_result1.receipts[1].cumulative_gas_used,
1504 mock_receipt_2.cumulative_gas_used
1505 );
1506 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1507
1508 let result2 = range_mode.next().await;
1510 assert!(result2.is_ok());
1511 let receipt_result2 = result2.unwrap().unwrap();
1512 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1513 assert_eq!(receipt_result2.header.number, 43);
1514
1515 assert_eq!(receipt_result2.receipts.len(), 1);
1517 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1518 assert_eq!(
1519 receipt_result2.receipts[0].cumulative_gas_used,
1520 mock_receipt_3.cumulative_gas_used
1521 );
1522 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1523
1524 assert!(range_mode.next.is_empty());
1526
1527 let result3 = range_mode.next().await;
1528 assert!(result3.is_ok());
1529 }
1530
1531 #[tokio::test]
1532 async fn test_range_block_mode_single_block_no_receipts() {
1533 let provider = MockEthProvider::default();
1534 let eth_api = build_test_eth_api(provider);
1535
1536 let eth_filter =
1537 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1538 let filter_inner = eth_filter.inner;
1539
1540 let headers = vec![SealedHeader::new(
1541 alloy_consensus::Header { number: 100, ..Default::default() },
1542 FixedBytes::random(),
1543 )];
1544
1545 let mut range_mode = RangeBlockMode {
1546 filter_inner,
1547 iter: headers.into_iter().peekable(),
1548 next: VecDeque::new(),
1549 max_range: 100,
1550 pending_tasks: FuturesOrdered::new(),
1551 };
1552
1553 let result = range_mode.next().await;
1554 assert!(result.is_ok());
1555 }
1556
1557 #[tokio::test]
1558 async fn test_range_block_mode_provider_receipts() {
1559 let provider = MockEthProvider::default();
1560
1561 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1562 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1563 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1564
1565 let block_hash_1 = FixedBytes::random();
1566 let block_hash_2 = FixedBytes::random();
1567 let block_hash_3 = FixedBytes::random();
1568
1569 provider.add_header(block_hash_1, header_1.clone());
1570 provider.add_header(block_hash_2, header_2.clone());
1571 provider.add_header(block_hash_3, header_3.clone());
1572
1573 let mock_log = alloy_primitives::Log {
1575 address: alloy_primitives::Address::ZERO,
1576 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1577 };
1578
1579 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1580 tx_type: TxType::Legacy,
1581 cumulative_gas_used: 21_000,
1582 logs: vec![mock_log.clone()],
1583 success: true,
1584 };
1585 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1586 tx_type: TxType::Eip1559,
1587 cumulative_gas_used: 42_000,
1588 logs: vec![mock_log.clone()],
1589 success: true,
1590 };
1591 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1592 tx_type: TxType::Eip2930,
1593 cumulative_gas_used: 30_000,
1594 logs: vec![mock_log.clone()],
1595 success: false,
1596 };
1597
1598 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1599 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1600
1601 let eth_api = build_test_eth_api(provider);
1602
1603 let eth_filter =
1604 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1605 let filter_inner = eth_filter.inner;
1606
1607 let headers = vec![
1608 SealedHeader::new(header_1, block_hash_1),
1609 SealedHeader::new(header_2, block_hash_2),
1610 SealedHeader::new(header_3, block_hash_3),
1611 ];
1612
1613 let mut range_mode = RangeBlockMode {
1614 filter_inner,
1615 iter: headers.into_iter().peekable(),
1616 next: VecDeque::new(),
1617 max_range: 3, pending_tasks: FuturesOrdered::new(),
1619 };
1620
1621 let result = range_mode.next().await;
1623 assert!(result.is_ok());
1624 let receipt_result = result.unwrap().unwrap();
1625
1626 assert_eq!(receipt_result.header.hash(), block_hash_1);
1627 assert_eq!(receipt_result.header.number, 100);
1628 assert_eq!(receipt_result.receipts.len(), 2);
1629
1630 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1632 assert_eq!(
1633 receipt_result.receipts[0].cumulative_gas_used,
1634 receipt_100_1.cumulative_gas_used
1635 );
1636 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1637
1638 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1639 assert_eq!(
1640 receipt_result.receipts[1].cumulative_gas_used,
1641 receipt_100_2.cumulative_gas_used
1642 );
1643 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1644
1645 let result2 = range_mode.next().await;
1647 assert!(result2.is_ok());
1648 let receipt_result2 = result2.unwrap().unwrap();
1649
1650 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1651 assert_eq!(receipt_result2.header.number, 101);
1652 assert_eq!(receipt_result2.receipts.len(), 1);
1653
1654 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1656 assert_eq!(
1657 receipt_result2.receipts[0].cumulative_gas_used,
1658 receipt_101_1.cumulative_gas_used
1659 );
1660 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1661
1662 let result3 = range_mode.next().await;
1664 assert!(result3.is_ok());
1665 assert!(result3.unwrap().is_none());
1666 }
1667
1668 #[tokio::test]
1669 async fn test_range_block_mode_iterator_exhaustion() {
1670 let provider = MockEthProvider::default();
1671
1672 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1673 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1674
1675 let block_hash_100 = FixedBytes::random();
1676 let block_hash_101 = FixedBytes::random();
1677
1678 provider.add_header(block_hash_100, header_100.clone());
1680 provider.add_header(block_hash_101, header_101.clone());
1681
1682 let mock_receipt = reth_ethereum_primitives::Receipt {
1684 tx_type: TxType::Legacy,
1685 cumulative_gas_used: 21_000,
1686 logs: vec![],
1687 success: true,
1688 };
1689 provider.add_receipts(100, vec![mock_receipt.clone()]);
1690 provider.add_receipts(101, vec![mock_receipt.clone()]);
1691
1692 let eth_api = build_test_eth_api(provider);
1693
1694 let eth_filter =
1695 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1696 let filter_inner = eth_filter.inner;
1697
1698 let headers = vec![
1699 SealedHeader::new(header_100, block_hash_100),
1700 SealedHeader::new(header_101, block_hash_101),
1701 ];
1702
1703 let mut range_mode = RangeBlockMode {
1704 filter_inner,
1705 iter: headers.into_iter().peekable(),
1706 next: VecDeque::new(),
1707 max_range: 1,
1708 pending_tasks: FuturesOrdered::new(),
1709 };
1710
1711 let result1 = range_mode.next().await;
1712 assert!(result1.is_ok());
1713 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1718 assert!(result2.is_ok());
1719 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1723
1724 let result3 = range_mode.next().await;
1726 assert!(result3.is_ok());
1727 assert!(result3.unwrap().is_none());
1728 }
1729
1730 #[tokio::test]
1731 async fn test_cached_mode_with_mock_receipts() {
1732 let test_hash = FixedBytes::from([42u8; 32]);
1734 let test_block_number = 100u64;
1735 let test_header = SealedHeader::new(
1736 alloy_consensus::Header {
1737 number: test_block_number,
1738 gas_used: 50_000,
1739 ..Default::default()
1740 },
1741 test_hash,
1742 );
1743
1744 let mock_log = alloy_primitives::Log {
1746 address: alloy_primitives::Address::ZERO,
1747 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1748 };
1749
1750 let mock_receipt = reth_ethereum_primitives::Receipt {
1751 tx_type: TxType::Legacy,
1752 cumulative_gas_used: 21_000,
1753 logs: vec![mock_log],
1754 success: true,
1755 };
1756
1757 let provider = MockEthProvider::default();
1758 provider.add_header(test_hash, test_header.header().clone());
1759 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1760
1761 let eth_api = build_test_eth_api(provider);
1762 let eth_filter =
1763 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1764 let filter_inner = eth_filter.inner;
1765
1766 let headers = vec![test_header.clone()];
1767
1768 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1769
1770 let result = cached_mode.next().await.expect("next should succeed");
1772 let receipt_block_result = result.expect("should have receipt result");
1773 assert_eq!(receipt_block_result.header.hash(), test_hash);
1774 assert_eq!(receipt_block_result.header.number, test_block_number);
1775 assert_eq!(receipt_block_result.receipts.len(), 1);
1776 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1777 assert_eq!(
1778 receipt_block_result.receipts[0].cumulative_gas_used,
1779 mock_receipt.cumulative_gas_used
1780 );
1781 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1782
1783 let result2 = cached_mode.next().await;
1785 assert!(result2.is_ok());
1786 assert!(result2.unwrap().is_none());
1787 }
1788
1789 #[tokio::test]
1790 async fn test_cached_mode_empty_headers() {
1791 let provider = MockEthProvider::default();
1792 let eth_api = build_test_eth_api(provider);
1793
1794 let eth_filter =
1795 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1796 let filter_inner = eth_filter.inner;
1797
1798 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1799
1800 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1801
1802 let result = cached_mode.next().await.expect("next should succeed");
1804 assert!(result.is_none());
1805 }
1806
1807 #[tokio::test]
1808 async fn test_non_consecutive_headers_after_bloom_filter() {
1809 let provider = MockEthProvider::default();
1810
1811 let mut expected_hashes = vec![];
1813 let mut prev_hash = alloy_primitives::B256::default();
1814
1815 use alloy_consensus::TxLegacy;
1817 use reth_ethereum_primitives::{TransactionSigned, TxType};
1818
1819 let tx_inner = TxLegacy {
1820 chain_id: Some(1),
1821 nonce: 0,
1822 gas_price: 21_000,
1823 gas_limit: 21_000,
1824 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1825 value: alloy_primitives::U256::ZERO,
1826 input: alloy_primitives::Bytes::new(),
1827 };
1828 let signature = alloy_primitives::Signature::test_signature();
1829 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1830
1831 for i in 100u64..=103 {
1832 let header = alloy_consensus::Header {
1833 number: i,
1834 parent_hash: prev_hash,
1835 logs_bloom: if i == 100 || i == 102 {
1837 alloy_primitives::Bloom::from([1u8; 256])
1838 } else {
1839 alloy_primitives::Bloom::default()
1840 },
1841 ..Default::default()
1842 };
1843
1844 let hash = header.hash_slow();
1845 expected_hashes.push(hash);
1846 prev_hash = hash;
1847
1848 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1850
1851 let block = reth_ethereum_primitives::Block {
1852 header,
1853 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1854 };
1855 provider.add_block(hash, block);
1856 }
1857
1858 let mock_log = alloy_primitives::Log {
1860 address: alloy_primitives::Address::ZERO,
1861 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1862 };
1863
1864 let receipt = reth_ethereum_primitives::Receipt {
1865 tx_type: TxType::Legacy,
1866 cumulative_gas_used: 21_000,
1867 logs: vec![mock_log],
1868 success: true,
1869 };
1870
1871 provider.add_receipts(100, vec![receipt.clone()]);
1872 provider.add_receipts(101, vec![]);
1873 provider.add_receipts(102, vec![receipt.clone()]);
1874 provider.add_receipts(103, vec![]);
1875
1876 use reth_db_api::models::StoredBlockBodyIndices;
1878 provider
1879 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1880 provider
1881 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1882 provider
1883 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1884 provider
1885 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1886
1887 let eth_api = build_test_eth_api(provider);
1888 let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1889
1890 let filter = Filter::default();
1892
1893 let logs = eth_filter
1895 .inner
1896 .clone()
1897 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1898 .await
1899 .expect("should succeed");
1900
1901 assert_eq!(logs.len(), 2);
1903
1904 assert_eq!(logs[0].block_number, Some(100));
1905 assert_eq!(logs[1].block_number, Some(102));
1906
1907 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1911}