1use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8 PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12 future::TryFutureExt,
13 stream::{FuturesOrdered, StreamExt},
14 Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21 helpers::{EthBlocks, LoadReceipt},
22 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23 RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32 ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::Runtime;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37 collections::{HashMap, VecDeque},
38 fmt,
39 iter::{Peekable, StepBy},
40 ops::RangeInclusive,
41 pin::Pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 sync::{mpsc::Receiver, oneshot, Mutex},
47 time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53 Eth: FullEthApiTypes
54 + RpcNodeCoreExt<Provider: BlockIdReader>
55 + LoadReceipt
56 + EthBlocks
57 + 'static,
58{
59 fn logs(
61 &self,
62 filter: Filter,
63 limits: QueryLimits,
64 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65 trace!(target: "rpc::eth", "Serving eth_getLogs");
66 self.logs_for_filter(filter, limits).map_err(|e| e.into())
67 }
68}
69
70const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91pub struct EthFilter<Eth: EthApiTypes> {
95 inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101 Eth: EthApiTypes,
102{
103 fn clone(&self) -> Self {
104 Self { inner: self.inner.clone() }
105 }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110 Eth: EthApiTypes + 'static,
111{
112 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self {
140 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141 config;
142 let inner = EthFilterInner {
143 eth_api,
144 active_filters: ActiveFilters::new(),
145 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146 max_headers_range: MAX_HEADERS_RANGE,
147 task_spawner,
148 stale_filter_ttl,
149 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150 };
151
152 let eth_filter = Self { inner: Arc::new(inner) };
153
154 let this = eth_filter.clone();
155 eth_filter.inner.task_spawner.spawn_critical_task(
156 "eth-filters_stale-filters-clean",
157 async move {
158 this.watch_and_clear_stale_filters().await;
159 },
160 );
161
162 eth_filter
163 }
164
165 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167 &self.inner.active_filters
168 }
169
170 async fn watch_and_clear_stale_filters(&self) {
173 let mut interval = tokio::time::interval_at(
174 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175 self.inner.stale_filter_ttl,
176 );
177 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178 loop {
179 interval.tick().await;
180 self.clear_stale_filters(Instant::now()).await;
181 }
182 }
183
184 pub async fn clear_stale_filters(&self, now: Instant) {
187 trace!(target: "rpc::eth", "clear stale filters");
188 let mut filters = self.active_filters().inner.lock().await;
189 filters.retain(|id, filter| {
190 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192 if !is_valid {
193 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194 }
195
196 is_valid
197 });
198 filters.shrink_to_fit();
199 }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205 + RpcNodeCoreExt
206 + LoadReceipt
207 + EthBlocks
208 + 'static,
209{
210 fn provider(&self) -> &Eth::Provider {
212 self.inner.eth_api.provider()
213 }
214
215 fn pool(&self) -> &Eth::Pool {
217 self.inner.eth_api.pool()
218 }
219
220 pub async fn filter_changes(
222 &self,
223 id: FilterId,
224 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225 let info = self.provider().chain_info()?;
226 let best_number = info.best_number;
227
228 let (start_block, kind) = {
231 let mut filters = self.inner.active_filters.inner.lock().await;
232 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234 if filter.block > best_number {
235 return Ok(FilterChanges::Empty)
237 }
238
239 let mut block = best_number + 1;
243 std::mem::swap(&mut filter.block, &mut block);
244 filter.last_poll_timestamp = Instant::now();
245
246 (block, filter.kind.clone())
247 };
248
249 match kind {
250 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251 FilterKind::Block => {
252 let end_block = best_number + 1;
255 let block_hashes =
256 self.provider().canonical_hashes_range(start_block, end_block).map_err(
257 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258 )?;
259 Ok(FilterChanges::Hashes(block_hashes))
260 }
261 FilterKind::Log(filter) => {
262 let (from_block_number, to_block_number) = match filter.block_option {
263 FilterBlockOption::Range { from_block, to_block } => {
264 let from = from_block
265 .map(|num| self.provider().convert_block_number(num))
266 .transpose()?
267 .flatten();
268 let to = to_block
269 .map(|num| self.provider().convert_block_number(num))
270 .transpose()?
271 .flatten();
272 logs_utils::get_filter_block_range(from, to, start_block, info)?
273 }
274 FilterBlockOption::AtBlockHash(block_hash) => {
275 let block_number = self
279 .provider()
280 .block_number(block_hash)?
281 .ok_or(ProviderError::HeaderNotFound(block_hash.into()))?;
282 (block_number, block_number)
283 }
284 };
285 let logs = self
286 .inner
287 .clone()
288 .get_logs_in_block_range(
289 *filter,
290 from_block_number,
291 to_block_number,
292 self.inner.query_limits,
293 )
294 .await?;
295 Ok(FilterChanges::Logs(logs))
296 }
297 }
298 }
299
300 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
306 let filter = {
307 let mut filters = self.inner.active_filters.inner.lock().await;
308 let filter =
309 filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
310 if let FilterKind::Log(ref inner_filter) = filter.kind {
311 filter.last_poll_timestamp = Instant::now();
312 *inner_filter.clone()
313 } else {
314 return Err(EthFilterError::FilterNotFound(id))
316 }
317 };
318
319 self.logs_for_filter(filter, self.inner.query_limits).await
320 }
321
322 async fn logs_for_filter(
324 &self,
325 filter: Filter,
326 limits: QueryLimits,
327 ) -> Result<Vec<Log>, EthFilterError> {
328 self.inner.clone().logs_for_filter(filter, limits).await
329 }
330}
331
332#[async_trait]
333impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
334where
335 Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
336{
337 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
339 trace!(target: "rpc::eth", "Serving eth_newFilter");
340 self.inner
341 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
342 .await
343 }
344
345 async fn new_block_filter(&self) -> RpcResult<FilterId> {
347 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
348 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
349 }
350
351 async fn new_pending_transaction_filter(
353 &self,
354 kind: Option<PendingTransactionFilterKind>,
355 ) -> RpcResult<FilterId> {
356 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
357
358 let transaction_kind = match kind.unwrap_or_default() {
359 PendingTransactionFilterKind::Hashes => {
360 let receiver = self.pool().pending_transactions_listener();
361 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
362 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
363 }
364 PendingTransactionFilterKind::Full => {
365 let stream = self.pool().new_pending_pool_transactions_listener();
366 let full_txs_receiver = FullTransactionsReceiver::new(
367 stream,
368 dyn_clone::clone(self.inner.eth_api.converter()),
369 );
370 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
371 full_txs_receiver,
372 )))
373 }
374 };
375
376 self.inner.install_filter(transaction_kind).await
378 }
379
380 async fn filter_changes(
382 &self,
383 id: FilterId,
384 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
385 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
386 Ok(Self::filter_changes(self, id).await?)
387 }
388
389 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
395 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
396 Ok(Self::filter_logs(self, id).await?)
397 }
398
399 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
401 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
402 let mut filters = self.inner.active_filters.inner.lock().await;
403 if filters.remove(&id).is_some() {
404 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
405 Ok(true)
406 } else {
407 Ok(false)
408 }
409 }
410
411 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
415 trace!(target: "rpc::eth", "Serving eth_getLogs");
416 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
417 }
418}
419
420impl<Eth> std::fmt::Debug for EthFilter<Eth>
421where
422 Eth: EthApiTypes,
423{
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 f.debug_struct("EthFilter").finish_non_exhaustive()
426 }
427}
428
429#[derive(Debug)]
431struct EthFilterInner<Eth: EthApiTypes> {
432 eth_api: Eth,
434 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
436 id_provider: Arc<dyn IdProvider>,
438 query_limits: QueryLimits,
440 max_headers_range: u64,
442 task_spawner: Runtime,
444 stale_filter_ttl: Duration,
446}
447
448impl<Eth> EthFilterInner<Eth>
449where
450 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
451 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
452 + LoadReceipt
453 + EthBlocks
454 + 'static,
455{
456 fn provider(&self) -> &Eth::Provider {
458 self.eth_api.provider()
459 }
460
461 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
463 self.eth_api.cache()
464 }
465
466 async fn logs_for_filter(
468 self: Arc<Self>,
469 filter: Filter,
470 limits: QueryLimits,
471 ) -> Result<Vec<Log>, EthFilterError> {
472 match filter.block_option {
473 FilterBlockOption::AtBlockHash(block_hash) => {
474 let Some((receipts, maybe_block)) =
476 self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
477 else {
478 return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
479 };
480
481 let (block_number, block_timestamp) = if let Some(block) = &maybe_block {
483 (block.header().number(), block.header().timestamp())
484 } else {
485 let header = self
486 .provider()
487 .header_by_hash_or_number(block_hash.into())?
488 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
489 (header.number(), header.timestamp())
490 };
491
492 let earliest_block = self.provider().earliest_block_number()?;
494 if block_number < earliest_block {
495 return Err(EthApiError::PrunedHistoryUnavailable.into());
496 }
497
498 let block_num_hash = BlockNumHash::new(block_number, block_hash);
499
500 let mut all_logs = Vec::new();
501 append_matching_block_logs(
502 &mut all_logs,
503 maybe_block
504 .map(ProviderOrBlock::Block)
505 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
506 &filter,
507 block_num_hash,
508 &receipts,
509 false,
510 block_timestamp,
511 )?;
512
513 Ok(all_logs)
514 }
515 FilterBlockOption::Range { from_block, to_block } => {
516 if from_block.is_some_and(|b| b.is_pending()) {
518 let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
519 if !(to_block.is_pending() || to_block.is_number()) {
520 return Ok(Vec::new());
522 }
523 if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
525 if let BlockNumberOrTag::Number(to_block) = to_block &&
526 to_block < pending_block.block.number()
527 {
528 return Ok(Vec::new());
530 }
531
532 let info = self.provider().chain_info()?;
533 if pending_block.block.number() > info.best_number {
534 let mut all_logs = Vec::new();
536 let timestamp = pending_block.block.timestamp();
537 let block_num_hash = pending_block.block.num_hash();
538 append_matching_block_logs(
539 &mut all_logs,
540 ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
541 &filter,
542 block_num_hash,
543 &pending_block.receipts,
544 false, timestamp,
546 )?;
547 return Ok(all_logs);
548 }
549 }
550 }
551
552 let info = self.provider().chain_info()?;
553 let start_block = info.best_number;
554 let from = from_block
555 .map(|num| self.provider().convert_block_number(num))
556 .transpose()?
557 .flatten();
558 let to = to_block
559 .map(|num| self.provider().convert_block_number(num))
560 .transpose()?
561 .flatten();
562
563 if let Some(t) = to &&
565 t > info.best_number
566 {
567 return Err(EthFilterError::BlockRangeExceedsHead {
568 requested: t,
569 head: info.best_number,
570 });
571 }
572
573 if let Some(f) = from &&
574 f > info.best_number
575 {
576 return Ok(Vec::new());
578 }
579
580 let (from_block_number, to_block_number) =
581 logs_utils::get_filter_block_range(from, to, start_block, info)?;
582
583 let earliest_block = self.provider().earliest_block_number()?;
585 if from_block_number < earliest_block {
586 return Err(EthApiError::PrunedHistoryUnavailable.into());
587 }
588
589 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
590 .await
591 }
592 }
593 }
594
595 async fn install_filter(
597 &self,
598 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
599 ) -> RpcResult<FilterId> {
600 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
601 let subscription_id = self.id_provider.next_id();
602
603 let id = match subscription_id {
604 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
605 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
606 };
607 let mut filters = self.active_filters.inner.lock().await;
608 filters.insert(
609 id.clone(),
610 ActiveFilter {
611 block: last_poll_block_number,
612 last_poll_timestamp: Instant::now(),
613 kind,
614 },
615 );
616 Ok(id)
617 }
618
619 async fn get_logs_in_block_range(
625 self: Arc<Self>,
626 filter: Filter,
627 from_block: u64,
628 to_block: u64,
629 limits: QueryLimits,
630 ) -> Result<Vec<Log>, EthFilterError> {
631 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
632
633 if to_block < from_block {
635 return Err(EthFilterError::InvalidBlockRangeParams)
636 }
637
638 if let Some(max_blocks_per_filter) =
639 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
640 {
641 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
642 }
643
644 let (tx, rx) = oneshot::channel();
645 let this = self.clone();
646 self.task_spawner.spawn_blocking_task(async move {
647 let res =
648 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
649 let _ = tx.send(res);
650 });
651
652 rx.await.map_err(|_| EthFilterError::InternalError)?
653 }
654
655 async fn get_logs_in_block_range_inner(
664 self: Arc<Self>,
665 filter: &Filter,
666 from_block: u64,
667 to_block: u64,
668 limits: QueryLimits,
669 ) -> Result<Vec<Log>, EthFilterError> {
670 let mut all_logs = Vec::new();
671 let mut matching_headers = Vec::new();
672
673 let chain_tip = self.provider().best_block_number()?;
675
676 for (from, to) in
678 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
679 {
680 let headers = self.provider().headers_range(from..=to)?;
681
682 let mut headers_iter = headers.into_iter().peekable();
683
684 while let Some(header) = headers_iter.next() {
685 if !filter.matches_bloom(header.logs_bloom()) {
686 continue
687 }
688
689 let current_number = header.number();
690
691 let block_hash = match headers_iter.peek() {
692 Some(next_header) if next_header.number() == current_number + 1 => {
693 next_header.parent_hash()
695 }
696 _ => {
697 header.hash_slow()
699 }
700 };
701
702 matching_headers.push(SealedHeader::new(header, block_hash));
703 }
704 }
705
706 let mut range_mode = RangeMode::new(
708 self.clone(),
709 matching_headers,
710 from_block,
711 to_block,
712 self.max_headers_range,
713 chain_tip,
714 );
715
716 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
718 range_mode.next().await?
719 {
720 let num_hash = header.num_hash();
721 append_matching_block_logs(
722 &mut all_logs,
723 recovered_block
724 .map(ProviderOrBlock::Block)
725 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
726 filter,
727 num_hash,
728 &receipts,
729 false,
730 header.timestamp(),
731 )?;
732
733 let is_multi_block_range = from_block != to_block;
736 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
737 is_multi_block_range &&
738 all_logs.len() > max_logs_per_response
739 {
740 let retry_to_block =
741 if num_hash.number == from_block { from_block } else { num_hash.number - 1 };
742
743 debug!(
744 target: "rpc::eth::filter",
745 logs_found = all_logs.len(),
746 max_logs_per_response,
747 from_block,
748 to_block = retry_to_block,
749 "Query exceeded max logs per response limit"
750 );
751 return Err(EthFilterError::QueryExceedsMaxResults {
752 max_logs: max_logs_per_response,
753 from_block,
754 to_block: retry_to_block,
755 });
756 }
757 }
758
759 Ok(all_logs)
760 }
761}
762
763#[derive(Debug, Clone, Default)]
765pub struct ActiveFilters<T> {
766 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
767}
768
769impl<T> ActiveFilters<T> {
770 pub fn new() -> Self {
772 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
773 }
774
775 pub async fn contains(&self, id: &FilterId) -> bool {
777 self.inner.lock().await.contains_key(id)
778 }
779
780 pub async fn len(&self) -> usize {
782 self.inner.lock().await.len()
783 }
784
785 pub async fn is_empty(&self) -> bool {
787 self.inner.lock().await.is_empty()
788 }
789
790 pub async fn ids(&self) -> Vec<FilterId> {
792 self.inner.lock().await.keys().cloned().collect()
793 }
794}
795
796#[derive(Debug)]
798struct ActiveFilter<T> {
799 block: u64,
801 last_poll_timestamp: Instant,
803 kind: FilterKind<T>,
805}
806
807#[derive(Debug, Clone)]
809struct PendingTransactionsReceiver {
810 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
811}
812
813impl PendingTransactionsReceiver {
814 fn new(receiver: Receiver<TxHash>) -> Self {
815 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
816 }
817
818 async fn drain<T>(&self) -> FilterChanges<T> {
820 let mut pending_txs = Vec::new();
821 let mut prepared_stream = self.txs_receiver.lock().await;
822
823 while let Ok(tx_hash) = prepared_stream.try_recv() {
824 pending_txs.push(tx_hash);
825 }
826
827 FilterChanges::Hashes(pending_txs)
829 }
830}
831
832#[derive(Debug, Clone)]
834struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
835 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
836 converter: TxCompat,
837}
838
839impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
840where
841 T: PoolTransaction + 'static,
842 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
843{
844 fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
846 Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
847 }
848
849 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
851 let mut pending_txs = Vec::new();
852 let mut prepared_stream = self.txs_stream.lock().await;
853
854 while let Ok(tx) = prepared_stream.try_recv() {
855 match self.converter.fill_pending(tx.transaction.to_consensus()) {
856 Ok(tx) => pending_txs.push(tx),
857 Err(err) => {
858 error!(target: "rpc",
859 %err,
860 "Failed to fill txn with block context"
861 );
862 }
863 }
864 }
865 FilterChanges::Transactions(pending_txs)
866 }
867}
868
869#[async_trait]
871trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
872 async fn drain(&self) -> FilterChanges<T>;
873}
874
875#[async_trait]
876impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
877 for FullTransactionsReceiver<T, TxCompat>
878where
879 T: PoolTransaction + 'static,
880 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
881{
882 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
883 Self::drain(self).await
884 }
885}
886
887#[derive(Debug, Clone)]
893enum PendingTransactionKind<T> {
894 Hashes(PendingTransactionsReceiver),
895 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
896}
897
898impl<T: 'static> PendingTransactionKind<T> {
899 async fn drain(&self) -> FilterChanges<T> {
900 match self {
901 Self::Hashes(receiver) => receiver.drain().await,
902 Self::FullTransaction(receiver) => receiver.drain().await,
903 }
904 }
905}
906
907#[derive(Clone, Debug)]
908enum FilterKind<T> {
909 Log(Box<Filter>),
910 Block,
911 PendingTransaction(PendingTransactionKind<T>),
912}
913
914#[derive(Debug)]
916struct BlockRangeInclusiveIter {
917 iter: StepBy<RangeInclusive<u64>>,
918 step: u64,
919 end: u64,
920}
921
922impl BlockRangeInclusiveIter {
923 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
924 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
925 }
926}
927
928impl Iterator for BlockRangeInclusiveIter {
929 type Item = (u64, u64);
930
931 fn next(&mut self) -> Option<Self::Item> {
932 let start = self.iter.next()?;
933 let end = (start + self.step).min(self.end);
934 if start > end {
935 return None
936 }
937 Some((start, end))
938 }
939}
940
941#[derive(Debug, thiserror::Error)]
943pub enum EthFilterError {
944 #[error("filter not found")]
946 FilterNotFound(FilterId),
947 #[error("invalid block range params")]
949 InvalidBlockRangeParams,
950 #[error("block range extends beyond current head block: requested {requested}, head {head}")]
952 BlockRangeExceedsHead {
953 requested: u64,
955 head: u64,
957 },
958 #[error("query exceeds max block range {0}")]
960 QueryExceedsMaxBlocks(u64),
961 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
963 QueryExceedsMaxResults {
964 max_logs: usize,
966 from_block: u64,
968 to_block: u64,
970 },
971 #[error(transparent)]
973 EthAPIError(#[from] EthApiError),
974 #[error("internal filter error")]
976 InternalError,
977}
978
979impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
980 fn from(err: EthFilterError) -> Self {
981 match err {
982 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
983 jsonrpsee::types::error::INVALID_PARAMS_CODE,
984 "filter not found",
985 ),
986 err @ EthFilterError::InternalError => {
987 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
988 }
989 EthFilterError::EthAPIError(err) => err.into(),
990 err @ (EthFilterError::InvalidBlockRangeParams |
991 EthFilterError::QueryExceedsMaxBlocks(_) |
992 EthFilterError::QueryExceedsMaxResults { .. } |
993 EthFilterError::BlockRangeExceedsHead { .. }) => {
994 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
995 }
996 }
997 }
998}
999
1000impl From<ProviderError> for EthFilterError {
1001 fn from(err: ProviderError) -> Self {
1002 Self::EthAPIError(err.into())
1003 }
1004}
1005
1006impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
1007 fn from(err: logs_utils::FilterBlockRangeError) -> Self {
1008 match err {
1009 logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
1010 logs_utils::FilterBlockRangeError::BlockRangeExceedsHead { requested, head } => {
1011 Self::BlockRangeExceedsHead { requested, head }
1012 }
1013 }
1014 }
1015}
1016
1017struct ReceiptBlockResult<P>
1020where
1021 P: ReceiptProvider + BlockReader,
1022{
1023 receipts: Arc<Vec<ProviderReceipt<P>>>,
1025 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
1027 header: SealedHeader<<P as HeaderProvider>::Header>,
1029}
1030
1031enum RangeMode<
1033 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1034 + EthApiTypes
1035 + LoadReceipt
1036 + EthBlocks
1037 + 'static,
1038> {
1039 Cached(CachedMode<Eth>),
1041 Range(RangeBlockMode<Eth>),
1043}
1044
1045impl<
1046 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1047 + EthApiTypes
1048 + LoadReceipt
1049 + EthBlocks
1050 + 'static,
1051 > RangeMode<Eth>
1052{
1053 fn new(
1055 filter_inner: Arc<EthFilterInner<Eth>>,
1056 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1057 from_block: u64,
1058 to_block: u64,
1059 max_headers_range: u64,
1060 chain_tip: u64,
1061 ) -> Self {
1062 let block_count = to_block - from_block + 1;
1063 let distance_from_tip = chain_tip.saturating_sub(to_block);
1064
1065 let use_cached_mode =
1067 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1068
1069 if use_cached_mode && !sealed_headers.is_empty() {
1070 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1071 } else {
1072 Self::Range(RangeBlockMode {
1073 filter_inner,
1074 iter: sealed_headers.into_iter().peekable(),
1075 next: VecDeque::new(),
1076 max_range: max_headers_range as usize,
1077 pending_tasks: FuturesOrdered::new(),
1078 })
1079 }
1080 }
1081
1082 const fn should_use_cached_mode(
1084 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1085 block_count: u64,
1086 distance_from_tip: u64,
1087 ) -> bool {
1088 let bloom_matches = headers.len();
1090
1091 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1093
1094 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1095 }
1096
1097 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1099 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1101 return CACHED_MODE_BLOCK_THRESHOLD;
1102 }
1103
1104 match bloom_matches {
1105 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1106 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1107 _ => CACHED_MODE_BLOCK_THRESHOLD,
1108 }
1109 }
1110
1111 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1113 match self {
1114 Self::Cached(cached) => cached.next().await,
1115 Self::Range(range) => range.next().await,
1116 }
1117 }
1118}
1119
1120struct CachedMode<
1122 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1123 + EthApiTypes
1124 + LoadReceipt
1125 + EthBlocks
1126 + 'static,
1127> {
1128 filter_inner: Arc<EthFilterInner<Eth>>,
1129 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1130}
1131
1132impl<
1133 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1134 + EthApiTypes
1135 + LoadReceipt
1136 + EthBlocks
1137 + 'static,
1138 > CachedMode<Eth>
1139{
1140 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1141 for header in self.headers_iter.by_ref() {
1142 if let Some((receipts, maybe_block)) =
1144 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1145 {
1146 return Ok(Some(ReceiptBlockResult {
1147 receipts,
1148 recovered_block: maybe_block,
1149 header,
1150 }));
1151 }
1152 }
1153
1154 Ok(None) }
1156}
1157
1158type ReceiptFetchFuture<P> =
1160 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1161
1162struct RangeBlockMode<
1164 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1165 + EthApiTypes
1166 + LoadReceipt
1167 + EthBlocks
1168 + 'static,
1169> {
1170 filter_inner: Arc<EthFilterInner<Eth>>,
1171 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1172 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1173 max_range: usize,
1174 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1176}
1177
1178impl<
1179 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1180 + EthApiTypes
1181 + LoadReceipt
1182 + EthBlocks
1183 + 'static,
1184 > RangeBlockMode<Eth>
1185{
1186 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1187 loop {
1188 if let Some(result) = self.next.pop_front() {
1190 return Ok(Some(result));
1191 }
1192
1193 if let Some(task_result) = self.pending_tasks.next().await {
1195 self.next.extend(task_result?);
1196 continue;
1197 }
1198
1199 let Some(next_header) = self.iter.next() else {
1201 return Ok(None);
1203 };
1204
1205 let mut range_headers = Vec::with_capacity(self.max_range);
1206 range_headers.push(next_header);
1207
1208 while range_headers.len() < self.max_range {
1210 let Some(peeked) = self.iter.peek() else { break };
1211 let Some(last_header) = range_headers.last() else { break };
1212
1213 let expected_next = last_header.number() + 1;
1214 if peeked.number() != expected_next {
1215 trace!(
1216 target: "rpc::eth::filter",
1217 last_block = last_header.number(),
1218 next_block = peeked.number(),
1219 expected = expected_next,
1220 range_size = range_headers.len(),
1221 "Non-consecutive block detected, stopping range collection"
1222 );
1223 break; }
1225
1226 let Some(next_header) = self.iter.next() else { break };
1227 range_headers.push(next_header);
1228 }
1229
1230 let remaining_headers = self.iter.len() + range_headers.len();
1232 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1233 self.spawn_parallel_tasks(range_headers);
1234 } else {
1236 if let Some(result) = self.process_small_range(range_headers).await? {
1238 return Ok(Some(result));
1239 }
1240 }
1242 }
1243 }
1244
1245 async fn process_small_range(
1249 &mut self,
1250 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1251 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1252 for header in range_headers {
1254 let (maybe_block, maybe_receipts) = self
1256 .filter_inner
1257 .eth_cache()
1258 .maybe_cached_block_and_receipts(header.hash())
1259 .await?;
1260
1261 let receipts = match maybe_receipts {
1262 Some(receipts) => receipts,
1263 None => {
1264 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1266 Some(receipts) => Arc::new(receipts),
1267 None => continue, }
1269 }
1270 };
1271
1272 if !receipts.is_empty() {
1273 self.next.push_back(ReceiptBlockResult {
1274 receipts,
1275 recovered_block: maybe_block,
1276 header,
1277 });
1278 }
1279 }
1280
1281 Ok(self.next.pop_front())
1282 }
1283
1284 fn spawn_parallel_tasks(
1289 &mut self,
1290 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1291 ) {
1292 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1294 let header_chunks = range_headers
1295 .into_iter()
1296 .chunks(chunk_size)
1297 .into_iter()
1298 .map(|chunk| chunk.collect::<Vec<_>>())
1299 .collect::<Vec<_>>();
1300
1301 for chunk_headers in header_chunks {
1303 let filter_inner = self.filter_inner.clone();
1304 let chunk_task = Box::pin(async move {
1305 let chunk_task = tokio::task::spawn_blocking(move || {
1306 let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1307
1308 for header in chunk_headers {
1309 let receipts = match filter_inner
1312 .provider()
1313 .receipts_by_block(header.hash().into())?
1314 {
1315 Some(receipts) => Arc::new(receipts),
1316 None => continue, };
1318
1319 if !receipts.is_empty() {
1320 chunk_results.push(ReceiptBlockResult {
1321 receipts,
1322 recovered_block: None,
1323 header,
1324 });
1325 }
1326 }
1327
1328 Ok(chunk_results)
1329 });
1330
1331 match chunk_task.await {
1333 Ok(Ok(chunk_results)) => Ok(chunk_results),
1334 Ok(Err(e)) => Err(e),
1335 Err(join_err) => {
1336 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1337 Err(EthFilterError::InternalError)
1338 }
1339 }
1340 });
1341
1342 self.pending_tasks.push_back(chunk_task);
1343 }
1344 }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349 use super::*;
1350 use crate::{eth::EthApi, EthApiBuilder};
1351 use alloy_network::Ethereum;
1352 use alloy_primitives::FixedBytes;
1353 use rand::Rng;
1354 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1355 use reth_ethereum_primitives::TxType;
1356 use reth_evm_ethereum::EthEvmConfig;
1357 use reth_network_api::noop::NoopNetwork;
1358 use reth_provider::test_utils::MockEthProvider;
1359 use reth_rpc_convert::RpcConverter;
1360 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1361 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1362 use reth_tasks::Runtime;
1363 use reth_testing_utils::generators;
1364 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1365 use std::{collections::VecDeque, sync::Arc};
1366
1367 #[test]
1368 fn test_block_range_iter() {
1369 let mut rng = generators::rng();
1370
1371 let start = rng.random::<u32>() as u64;
1372 let end = start.saturating_add(rng.random::<u32>() as u64);
1373 let step = rng.random::<u16>() as u64;
1374 let range = start..=end;
1375 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1376 let (from, mut end) = iter.next().unwrap();
1377 assert_eq!(from, start);
1378 assert_eq!(end, (from + step).min(*range.end()));
1379
1380 for (next_from, next_end) in iter {
1381 assert_eq!(next_from, end + 1);
1383 end = next_end;
1384 }
1385
1386 assert_eq!(end, *range.end());
1387 }
1388
1389 #[expect(clippy::type_complexity)]
1391 fn build_test_eth_api(
1392 provider: MockEthProvider,
1393 ) -> EthApi<
1394 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1395 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1396 > {
1397 EthApiBuilder::new(
1398 provider.clone(),
1399 testing_pool(),
1400 NoopNetwork::default(),
1401 EthEvmConfig::new(provider.chain_spec()),
1402 )
1403 .build()
1404 }
1405
1406 #[tokio::test]
1407 async fn test_range_block_mode_empty_range() {
1408 let provider = MockEthProvider::default();
1409 let eth_api = build_test_eth_api(provider);
1410
1411 let eth_filter =
1412 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1413 let filter_inner = eth_filter.inner;
1414
1415 let headers = vec![];
1416 let max_range = 100;
1417
1418 let mut range_mode = RangeBlockMode {
1419 filter_inner,
1420 iter: headers.into_iter().peekable(),
1421 next: VecDeque::new(),
1422 max_range,
1423 pending_tasks: FuturesOrdered::new(),
1424 };
1425
1426 let result = range_mode.next().await;
1427 assert!(result.is_ok());
1428 assert!(result.unwrap().is_none());
1429 }
1430
1431 #[tokio::test]
1432 async fn test_range_block_mode_queued_results_priority() {
1433 let provider = MockEthProvider::default();
1434 let eth_api = build_test_eth_api(provider);
1435
1436 let eth_filter =
1437 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1438 let filter_inner = eth_filter.inner;
1439
1440 let headers = vec![
1441 SealedHeader::new(
1442 alloy_consensus::Header { number: 100, ..Default::default() },
1443 FixedBytes::random(),
1444 ),
1445 SealedHeader::new(
1446 alloy_consensus::Header { number: 101, ..Default::default() },
1447 FixedBytes::random(),
1448 ),
1449 ];
1450
1451 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1453 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1454
1455 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1457 tx_type: TxType::Legacy,
1458 cumulative_gas_used: 100_000,
1459 logs: vec![],
1460 success: true,
1461 };
1462 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1463 tx_type: TxType::Eip1559,
1464 cumulative_gas_used: 200_000,
1465 logs: vec![],
1466 success: true,
1467 };
1468 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1469 tx_type: TxType::Eip2930,
1470 cumulative_gas_used: 150_000,
1471 logs: vec![],
1472 success: false, };
1474
1475 let mock_result_1 = ReceiptBlockResult {
1476 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1477 recovered_block: None,
1478 header: SealedHeader::new(
1479 alloy_consensus::Header { number: 42, ..Default::default() },
1480 expected_block_hash_1,
1481 ),
1482 };
1483
1484 let mock_result_2 = ReceiptBlockResult {
1485 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1486 recovered_block: None,
1487 header: SealedHeader::new(
1488 alloy_consensus::Header { number: 43, ..Default::default() },
1489 expected_block_hash_2,
1490 ),
1491 };
1492
1493 let mut range_mode = RangeBlockMode {
1494 filter_inner,
1495 iter: headers.into_iter().peekable(),
1496 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1498 pending_tasks: FuturesOrdered::new(),
1499 };
1500
1501 let result1 = range_mode.next().await;
1503 assert!(result1.is_ok());
1504 let receipt_result1 = result1.unwrap().unwrap();
1505 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1506 assert_eq!(receipt_result1.header.number, 42);
1507
1508 assert_eq!(receipt_result1.receipts.len(), 2);
1510 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1511 assert_eq!(
1512 receipt_result1.receipts[0].cumulative_gas_used,
1513 mock_receipt_1.cumulative_gas_used
1514 );
1515 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1516 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1517 assert_eq!(
1518 receipt_result1.receipts[1].cumulative_gas_used,
1519 mock_receipt_2.cumulative_gas_used
1520 );
1521 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1522
1523 let result2 = range_mode.next().await;
1525 assert!(result2.is_ok());
1526 let receipt_result2 = result2.unwrap().unwrap();
1527 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1528 assert_eq!(receipt_result2.header.number, 43);
1529
1530 assert_eq!(receipt_result2.receipts.len(), 1);
1532 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1533 assert_eq!(
1534 receipt_result2.receipts[0].cumulative_gas_used,
1535 mock_receipt_3.cumulative_gas_used
1536 );
1537 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1538
1539 assert!(range_mode.next.is_empty());
1541
1542 let result3 = range_mode.next().await;
1543 assert!(result3.is_ok());
1544 }
1545
1546 #[tokio::test]
1547 async fn test_range_block_mode_single_block_no_receipts() {
1548 let provider = MockEthProvider::default();
1549 let eth_api = build_test_eth_api(provider);
1550
1551 let eth_filter =
1552 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1553 let filter_inner = eth_filter.inner;
1554
1555 let headers = vec![SealedHeader::new(
1556 alloy_consensus::Header { number: 100, ..Default::default() },
1557 FixedBytes::random(),
1558 )];
1559
1560 let mut range_mode = RangeBlockMode {
1561 filter_inner,
1562 iter: headers.into_iter().peekable(),
1563 next: VecDeque::new(),
1564 max_range: 100,
1565 pending_tasks: FuturesOrdered::new(),
1566 };
1567
1568 let result = range_mode.next().await;
1569 assert!(result.is_ok());
1570 }
1571
1572 #[tokio::test]
1573 async fn test_range_block_mode_provider_receipts() {
1574 let provider = MockEthProvider::default();
1575
1576 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1577 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1578 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1579
1580 let block_hash_1 = FixedBytes::random();
1581 let block_hash_2 = FixedBytes::random();
1582 let block_hash_3 = FixedBytes::random();
1583
1584 provider.add_header(block_hash_1, header_1.clone());
1585 provider.add_header(block_hash_2, header_2.clone());
1586 provider.add_header(block_hash_3, header_3.clone());
1587
1588 let mock_log = alloy_primitives::Log {
1590 address: alloy_primitives::Address::ZERO,
1591 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1592 };
1593
1594 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1595 tx_type: TxType::Legacy,
1596 cumulative_gas_used: 21_000,
1597 logs: vec![mock_log.clone()],
1598 success: true,
1599 };
1600 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1601 tx_type: TxType::Eip1559,
1602 cumulative_gas_used: 42_000,
1603 logs: vec![mock_log.clone()],
1604 success: true,
1605 };
1606 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1607 tx_type: TxType::Eip2930,
1608 cumulative_gas_used: 30_000,
1609 logs: vec![mock_log.clone()],
1610 success: false,
1611 };
1612
1613 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1614 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1615
1616 let eth_api = build_test_eth_api(provider);
1617
1618 let eth_filter =
1619 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1620 let filter_inner = eth_filter.inner;
1621
1622 let headers = vec![
1623 SealedHeader::new(header_1, block_hash_1),
1624 SealedHeader::new(header_2, block_hash_2),
1625 SealedHeader::new(header_3, block_hash_3),
1626 ];
1627
1628 let mut range_mode = RangeBlockMode {
1629 filter_inner,
1630 iter: headers.into_iter().peekable(),
1631 next: VecDeque::new(),
1632 max_range: 3, pending_tasks: FuturesOrdered::new(),
1634 };
1635
1636 let result = range_mode.next().await;
1638 assert!(result.is_ok());
1639 let receipt_result = result.unwrap().unwrap();
1640
1641 assert_eq!(receipt_result.header.hash(), block_hash_1);
1642 assert_eq!(receipt_result.header.number, 100);
1643 assert_eq!(receipt_result.receipts.len(), 2);
1644
1645 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1647 assert_eq!(
1648 receipt_result.receipts[0].cumulative_gas_used,
1649 receipt_100_1.cumulative_gas_used
1650 );
1651 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1652
1653 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1654 assert_eq!(
1655 receipt_result.receipts[1].cumulative_gas_used,
1656 receipt_100_2.cumulative_gas_used
1657 );
1658 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1659
1660 let result2 = range_mode.next().await;
1662 assert!(result2.is_ok());
1663 let receipt_result2 = result2.unwrap().unwrap();
1664
1665 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1666 assert_eq!(receipt_result2.header.number, 101);
1667 assert_eq!(receipt_result2.receipts.len(), 1);
1668
1669 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1671 assert_eq!(
1672 receipt_result2.receipts[0].cumulative_gas_used,
1673 receipt_101_1.cumulative_gas_used
1674 );
1675 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1676
1677 let result3 = range_mode.next().await;
1679 assert!(result3.is_ok());
1680 assert!(result3.unwrap().is_none());
1681 }
1682
1683 #[tokio::test]
1684 async fn test_range_block_mode_iterator_exhaustion() {
1685 let provider = MockEthProvider::default();
1686
1687 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1688 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1689
1690 let block_hash_100 = FixedBytes::random();
1691 let block_hash_101 = FixedBytes::random();
1692
1693 provider.add_header(block_hash_100, header_100.clone());
1695 provider.add_header(block_hash_101, header_101.clone());
1696
1697 let mock_receipt = reth_ethereum_primitives::Receipt {
1699 tx_type: TxType::Legacy,
1700 cumulative_gas_used: 21_000,
1701 logs: vec![],
1702 success: true,
1703 };
1704 provider.add_receipts(100, vec![mock_receipt.clone()]);
1705 provider.add_receipts(101, vec![mock_receipt.clone()]);
1706
1707 let eth_api = build_test_eth_api(provider);
1708
1709 let eth_filter =
1710 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1711 let filter_inner = eth_filter.inner;
1712
1713 let headers = vec![
1714 SealedHeader::new(header_100, block_hash_100),
1715 SealedHeader::new(header_101, block_hash_101),
1716 ];
1717
1718 let mut range_mode = RangeBlockMode {
1719 filter_inner,
1720 iter: headers.into_iter().peekable(),
1721 next: VecDeque::new(),
1722 max_range: 1,
1723 pending_tasks: FuturesOrdered::new(),
1724 };
1725
1726 let result1 = range_mode.next().await;
1727 assert!(result1.is_ok());
1728 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1733 assert!(result2.is_ok());
1734 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1738
1739 let result3 = range_mode.next().await;
1741 assert!(result3.is_ok());
1742 assert!(result3.unwrap().is_none());
1743 }
1744
1745 #[tokio::test]
1746 async fn test_cached_mode_with_mock_receipts() {
1747 let test_hash = FixedBytes::from([42u8; 32]);
1749 let test_block_number = 100u64;
1750 let test_header = SealedHeader::new(
1751 alloy_consensus::Header {
1752 number: test_block_number,
1753 gas_used: 50_000,
1754 ..Default::default()
1755 },
1756 test_hash,
1757 );
1758
1759 let mock_log = alloy_primitives::Log {
1761 address: alloy_primitives::Address::ZERO,
1762 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1763 };
1764
1765 let mock_receipt = reth_ethereum_primitives::Receipt {
1766 tx_type: TxType::Legacy,
1767 cumulative_gas_used: 21_000,
1768 logs: vec![mock_log],
1769 success: true,
1770 };
1771
1772 let provider = MockEthProvider::default();
1773 provider.add_header(test_hash, test_header.header().clone());
1774 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1775
1776 let eth_api = build_test_eth_api(provider);
1777 let eth_filter =
1778 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1779 let filter_inner = eth_filter.inner;
1780
1781 let headers = vec![test_header.clone()];
1782
1783 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1784
1785 let result = cached_mode.next().await.expect("next should succeed");
1787 let receipt_block_result = result.expect("should have receipt result");
1788 assert_eq!(receipt_block_result.header.hash(), test_hash);
1789 assert_eq!(receipt_block_result.header.number, test_block_number);
1790 assert_eq!(receipt_block_result.receipts.len(), 1);
1791 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1792 assert_eq!(
1793 receipt_block_result.receipts[0].cumulative_gas_used,
1794 mock_receipt.cumulative_gas_used
1795 );
1796 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1797
1798 let result2 = cached_mode.next().await;
1800 assert!(result2.is_ok());
1801 assert!(result2.unwrap().is_none());
1802 }
1803
1804 #[tokio::test]
1805 async fn test_cached_mode_empty_headers() {
1806 let provider = MockEthProvider::default();
1807 let eth_api = build_test_eth_api(provider);
1808
1809 let eth_filter =
1810 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1811 let filter_inner = eth_filter.inner;
1812
1813 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1814
1815 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1816
1817 let result = cached_mode.next().await.expect("next should succeed");
1819 assert!(result.is_none());
1820 }
1821
1822 #[tokio::test]
1823 async fn test_log_limit_retry_range_excludes_overflow_block() {
1824 let provider = MockEthProvider::default();
1825
1826 use alloy_consensus::TxLegacy;
1827 use reth_db_api::models::StoredBlockBodyIndices;
1828 use reth_ethereum_primitives::{TransactionSigned, TxType};
1829
1830 let tx_inner = TxLegacy {
1831 chain_id: Some(1),
1832 nonce: 0,
1833 gas_price: 21_000,
1834 gas_limit: 21_000,
1835 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1836 value: alloy_primitives::U256::ZERO,
1837 input: alloy_primitives::Bytes::new(),
1838 };
1839 let signature = alloy_primitives::Signature::test_signature();
1840 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1841
1842 let mock_log = alloy_primitives::Log {
1843 address: alloy_primitives::Address::ZERO,
1844 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1845 };
1846
1847 let receipt = reth_ethereum_primitives::Receipt {
1848 tx_type: TxType::Legacy,
1849 cumulative_gas_used: 21_000,
1850 logs: vec![mock_log],
1851 success: true,
1852 };
1853
1854 let mut prev_hash = alloy_primitives::B256::default();
1855 for (idx, block_number) in (100u64..=102).enumerate() {
1856 let header = alloy_consensus::Header {
1857 number: block_number,
1858 parent_hash: prev_hash,
1859 logs_bloom: alloy_primitives::Bloom::from([1u8; 256]),
1860 ..Default::default()
1861 };
1862 let hash = header.hash_slow();
1863 prev_hash = hash;
1864
1865 let block = reth_ethereum_primitives::Block {
1866 header,
1867 body: reth_ethereum_primitives::BlockBody {
1868 transactions: vec![tx.clone()],
1869 ..Default::default()
1870 },
1871 };
1872 provider.add_block(hash, block);
1873 provider.add_receipts(block_number, vec![receipt.clone()]);
1874 provider.add_block_body_indices(
1875 block_number,
1876 StoredBlockBodyIndices { first_tx_num: idx as u64, tx_count: 1 },
1877 );
1878 }
1879
1880 let eth_api = build_test_eth_api(provider);
1881 let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1882 let err = eth_filter
1883 .inner
1884 .clone()
1885 .get_logs_in_block_range(
1886 Filter::default(),
1887 100,
1888 102,
1889 QueryLimits { max_blocks_per_filter: None, max_logs_per_response: Some(2) },
1890 )
1891 .await
1892 .expect_err("range should exceed max logs");
1893
1894 let EthFilterError::QueryExceedsMaxResults { max_logs, from_block, to_block } = err else {
1895 panic!("unexpected error: {err:?}");
1896 };
1897
1898 assert_eq!(max_logs, 2);
1899 assert_eq!(from_block, 100);
1900 assert_eq!(to_block, 101);
1901 }
1902
1903 #[tokio::test]
1904 async fn test_non_consecutive_headers_after_bloom_filter() {
1905 let provider = MockEthProvider::default();
1906
1907 let mut expected_hashes = vec![];
1909 let mut prev_hash = alloy_primitives::B256::default();
1910
1911 use alloy_consensus::TxLegacy;
1913 use reth_ethereum_primitives::{TransactionSigned, TxType};
1914
1915 let tx_inner = TxLegacy {
1916 chain_id: Some(1),
1917 nonce: 0,
1918 gas_price: 21_000,
1919 gas_limit: 21_000,
1920 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1921 value: alloy_primitives::U256::ZERO,
1922 input: alloy_primitives::Bytes::new(),
1923 };
1924 let signature = alloy_primitives::Signature::test_signature();
1925 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1926
1927 for i in 100u64..=103 {
1928 let header = alloy_consensus::Header {
1929 number: i,
1930 parent_hash: prev_hash,
1931 logs_bloom: if i == 100 || i == 102 {
1933 alloy_primitives::Bloom::from([1u8; 256])
1934 } else {
1935 alloy_primitives::Bloom::default()
1936 },
1937 ..Default::default()
1938 };
1939
1940 let hash = header.hash_slow();
1941 expected_hashes.push(hash);
1942 prev_hash = hash;
1943
1944 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1946
1947 let block = reth_ethereum_primitives::Block {
1948 header,
1949 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1950 };
1951 provider.add_block(hash, block);
1952 }
1953
1954 let mock_log = alloy_primitives::Log {
1956 address: alloy_primitives::Address::ZERO,
1957 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1958 };
1959
1960 let receipt = reth_ethereum_primitives::Receipt {
1961 tx_type: TxType::Legacy,
1962 cumulative_gas_used: 21_000,
1963 logs: vec![mock_log],
1964 success: true,
1965 };
1966
1967 provider.add_receipts(100, vec![receipt.clone()]);
1968 provider.add_receipts(101, vec![]);
1969 provider.add_receipts(102, vec![receipt.clone()]);
1970 provider.add_receipts(103, vec![]);
1971
1972 use reth_db_api::models::StoredBlockBodyIndices;
1974 provider
1975 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1976 provider
1977 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1978 provider
1979 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1980 provider
1981 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1982
1983 let eth_api = build_test_eth_api(provider);
1984 let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1985
1986 let filter = Filter::default();
1988
1989 let logs = eth_filter
1991 .inner
1992 .clone()
1993 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1994 .await
1995 .expect("should succeed");
1996
1997 assert_eq!(logs.len(), 2);
1999
2000 assert_eq!(logs[0].block_number, Some(100));
2001 assert_eq!(logs[1].block_number, Some(102));
2002
2003 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
2007}