1use alloy_consensus::BlockHeader;
4use alloy_eips::BlockNumberOrTag;
5use alloy_primitives::{Sealable, TxHash};
6use alloy_rpc_types_eth::{
7 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
8 PendingTransactionFilterKind,
9};
10use async_trait::async_trait;
11use futures::{
12 future::TryFutureExt,
13 stream::{FuturesOrdered, StreamExt},
14 Future,
15};
16use itertools::Itertools;
17use jsonrpsee::{core::RpcResult, server::IdProvider};
18use reth_errors::ProviderError;
19use reth_primitives_traits::{NodePrimitives, SealedHeader};
20use reth_rpc_eth_api::{
21 helpers::{EthBlocks, LoadReceipt},
22 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
23 RpcNodeCoreExt, RpcTransaction,
24};
25use reth_rpc_eth_types::{
26 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
27 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
28};
29use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
30use reth_storage_api::{
31 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
32 ProviderReceipt, ReceiptProvider,
33};
34use reth_tasks::Runtime;
35use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
36use std::{
37 collections::{HashMap, VecDeque},
38 fmt,
39 iter::{Peekable, StepBy},
40 ops::RangeInclusive,
41 pin::Pin,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 sync::{mpsc::Receiver, oneshot, Mutex},
47 time::MissedTickBehavior,
48};
49use tracing::{debug, error, trace};
50
51impl<Eth> EngineEthFilter for EthFilter<Eth>
52where
53 Eth: FullEthApiTypes
54 + RpcNodeCoreExt<Provider: BlockIdReader>
55 + LoadReceipt
56 + EthBlocks
57 + 'static,
58{
59 fn logs(
61 &self,
62 filter: Filter,
63 limits: QueryLimits,
64 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
65 trace!(target: "rpc::eth", "Serving eth_getLogs");
66 self.logs_for_filter(filter, limits).map_err(|e| e.into())
67 }
68}
69
70const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
72
73const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
75
76const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
78
79const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
81
82const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
87
88const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
90
91pub struct EthFilter<Eth: EthApiTypes> {
95 inner: Arc<EthFilterInner<Eth>>,
97}
98
99impl<Eth> Clone for EthFilter<Eth>
100where
101 Eth: EthApiTypes,
102{
103 fn clone(&self) -> Self {
104 Self { inner: self.inner.clone() }
105 }
106}
107
108impl<Eth> EthFilter<Eth>
109where
110 Eth: EthApiTypes + 'static,
111{
112 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Runtime) -> Self {
140 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
141 config;
142 let inner = EthFilterInner {
143 eth_api,
144 active_filters: ActiveFilters::new(),
145 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
146 max_headers_range: MAX_HEADERS_RANGE,
147 task_spawner,
148 stale_filter_ttl,
149 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
150 };
151
152 let eth_filter = Self { inner: Arc::new(inner) };
153
154 let this = eth_filter.clone();
155 eth_filter.inner.task_spawner.spawn_critical_task(
156 "eth-filters_stale-filters-clean",
157 async move {
158 this.watch_and_clear_stale_filters().await;
159 },
160 );
161
162 eth_filter
163 }
164
165 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
167 &self.inner.active_filters
168 }
169
170 async fn watch_and_clear_stale_filters(&self) {
173 let mut interval = tokio::time::interval_at(
174 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
175 self.inner.stale_filter_ttl,
176 );
177 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
178 loop {
179 interval.tick().await;
180 self.clear_stale_filters(Instant::now()).await;
181 }
182 }
183
184 pub async fn clear_stale_filters(&self, now: Instant) {
187 trace!(target: "rpc::eth", "clear stale filters");
188 let mut filters = self.active_filters().inner.lock().await;
189 filters.retain(|id, filter| {
190 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
191
192 if !is_valid {
193 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
194 }
195
196 is_valid
197 });
198 filters.shrink_to_fit();
199 }
200}
201
202impl<Eth> EthFilter<Eth>
203where
204 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
205 + RpcNodeCoreExt
206 + LoadReceipt
207 + EthBlocks
208 + 'static,
209{
210 fn provider(&self) -> &Eth::Provider {
212 self.inner.eth_api.provider()
213 }
214
215 fn pool(&self) -> &Eth::Pool {
217 self.inner.eth_api.pool()
218 }
219
220 pub async fn filter_changes(
222 &self,
223 id: FilterId,
224 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
225 let info = self.provider().chain_info()?;
226 let best_number = info.best_number;
227
228 let (start_block, kind) = {
231 let mut filters = self.inner.active_filters.inner.lock().await;
232 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
233
234 if filter.block > best_number {
235 return Ok(FilterChanges::Empty)
237 }
238
239 let mut block = best_number + 1;
243 std::mem::swap(&mut filter.block, &mut block);
244 filter.last_poll_timestamp = Instant::now();
245
246 (block, filter.kind.clone())
247 };
248
249 match kind {
250 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
251 FilterKind::Block => {
252 let end_block = best_number + 1;
255 let block_hashes =
256 self.provider().canonical_hashes_range(start_block, end_block).map_err(
257 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
258 )?;
259 Ok(FilterChanges::Hashes(block_hashes))
260 }
261 FilterKind::Log(filter) => {
262 let (from_block_number, to_block_number) = match filter.block_option {
263 FilterBlockOption::Range { from_block, to_block } => {
264 let from = from_block
265 .map(|num| self.provider().convert_block_number(num))
266 .transpose()?
267 .flatten();
268 let to = to_block
269 .map(|num| self.provider().convert_block_number(num))
270 .transpose()?
271 .flatten();
272 logs_utils::get_filter_block_range(from, to, start_block, info)?
273 }
274 FilterBlockOption::AtBlockHash(block_hash) => {
275 let block_number = self
279 .provider()
280 .block_number(block_hash)?
281 .ok_or(ProviderError::HeaderNotFound(block_hash.into()))?;
282 (block_number, block_number)
283 }
284 };
285 let logs = self
286 .inner
287 .clone()
288 .get_logs_in_block_range(
289 *filter,
290 from_block_number,
291 to_block_number,
292 self.inner.query_limits,
293 )
294 .await?;
295 Ok(FilterChanges::Logs(logs))
296 }
297 }
298 }
299
300 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
306 let filter = {
307 let mut filters = self.inner.active_filters.inner.lock().await;
308 let filter =
309 filters.get_mut(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
310 if let FilterKind::Log(ref inner_filter) = filter.kind {
311 filter.last_poll_timestamp = Instant::now();
312 *inner_filter.clone()
313 } else {
314 return Err(EthFilterError::FilterNotFound(id))
316 }
317 };
318
319 self.logs_for_filter(filter, self.inner.query_limits).await
320 }
321
322 async fn logs_for_filter(
324 &self,
325 filter: Filter,
326 limits: QueryLimits,
327 ) -> Result<Vec<Log>, EthFilterError> {
328 self.inner.clone().logs_for_filter(filter, limits).await
329 }
330}
331
332#[async_trait]
333impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
334where
335 Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
336{
337 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
339 trace!(target: "rpc::eth", "Serving eth_newFilter");
340 self.inner
341 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
342 .await
343 }
344
345 async fn new_block_filter(&self) -> RpcResult<FilterId> {
347 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
348 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
349 }
350
351 async fn new_pending_transaction_filter(
353 &self,
354 kind: Option<PendingTransactionFilterKind>,
355 ) -> RpcResult<FilterId> {
356 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
357
358 let transaction_kind = match kind.unwrap_or_default() {
359 PendingTransactionFilterKind::Hashes => {
360 let receiver = self.pool().pending_transactions_listener();
361 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
362 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
363 }
364 PendingTransactionFilterKind::Full => {
365 let stream = self.pool().new_pending_pool_transactions_listener();
366 let full_txs_receiver = FullTransactionsReceiver::new(
367 stream,
368 dyn_clone::clone(self.inner.eth_api.converter()),
369 );
370 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
371 full_txs_receiver,
372 )))
373 }
374 };
375
376 self.inner.install_filter(transaction_kind).await
378 }
379
380 async fn filter_changes(
382 &self,
383 id: FilterId,
384 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
385 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
386 Ok(Self::filter_changes(self, id).await?)
387 }
388
389 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
395 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
396 Ok(Self::filter_logs(self, id).await?)
397 }
398
399 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
401 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
402 let mut filters = self.inner.active_filters.inner.lock().await;
403 if filters.remove(&id).is_some() {
404 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
405 Ok(true)
406 } else {
407 Ok(false)
408 }
409 }
410
411 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
415 trace!(target: "rpc::eth", "Serving eth_getLogs");
416 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
417 }
418}
419
420impl<Eth> std::fmt::Debug for EthFilter<Eth>
421where
422 Eth: EthApiTypes,
423{
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 f.debug_struct("EthFilter").finish_non_exhaustive()
426 }
427}
428
429#[derive(Debug)]
431struct EthFilterInner<Eth: EthApiTypes> {
432 eth_api: Eth,
434 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
436 id_provider: Arc<dyn IdProvider>,
438 query_limits: QueryLimits,
440 max_headers_range: u64,
442 task_spawner: Runtime,
444 stale_filter_ttl: Duration,
446}
447
448impl<Eth> EthFilterInner<Eth>
449where
450 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
451 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
452 + LoadReceipt
453 + EthBlocks
454 + 'static,
455{
456 fn provider(&self) -> &Eth::Provider {
458 self.eth_api.provider()
459 }
460
461 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
463 self.eth_api.cache()
464 }
465
466 async fn logs_for_filter(
468 self: Arc<Self>,
469 filter: Filter,
470 limits: QueryLimits,
471 ) -> Result<Vec<Log>, EthFilterError> {
472 match filter.block_option {
473 FilterBlockOption::AtBlockHash(block_hash) => {
474 let Some((receipts, maybe_block)) =
476 self.eth_cache().get_receipts_and_maybe_block(block_hash).await?
477 else {
478 return Err(ProviderError::HeaderNotFound(block_hash.into()).into())
479 };
480
481 let (block_number, block_timestamp) = if let Some(block) = &maybe_block {
483 (block.header().number(), block.header().timestamp())
484 } else {
485 let header = self
486 .provider()
487 .header_by_hash_or_number(block_hash.into())?
488 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
489 (header.number(), header.timestamp())
490 };
491
492 let earliest_block = self.provider().earliest_block_number()?;
494 if block_number < earliest_block {
495 return Err(EthApiError::PrunedHistoryUnavailable.into());
496 }
497
498 let block_num_hash = BlockNumHash::new(block_number, block_hash);
499
500 let mut all_logs = Vec::new();
501 append_matching_block_logs(
502 &mut all_logs,
503 maybe_block
504 .map(ProviderOrBlock::Block)
505 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
506 &filter,
507 block_num_hash,
508 &receipts,
509 false,
510 block_timestamp,
511 )?;
512
513 Ok(all_logs)
514 }
515 FilterBlockOption::Range { from_block, to_block } => {
516 if from_block.is_some_and(|b| b.is_pending()) {
518 let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
519 if !(to_block.is_pending() || to_block.is_number()) {
520 return Ok(Vec::new());
522 }
523 if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
525 if let BlockNumberOrTag::Number(to_block) = to_block &&
526 to_block < pending_block.block.number()
527 {
528 return Ok(Vec::new());
530 }
531
532 let info = self.provider().chain_info()?;
533 if pending_block.block.number() > info.best_number {
534 let mut all_logs = Vec::new();
536 let timestamp = pending_block.block.timestamp();
537 let block_num_hash = pending_block.block.num_hash();
538 append_matching_block_logs(
539 &mut all_logs,
540 ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
541 &filter,
542 block_num_hash,
543 &pending_block.receipts,
544 false, timestamp,
546 )?;
547 return Ok(all_logs);
548 }
549 }
550 }
551
552 let info = self.provider().chain_info()?;
553 let start_block = info.best_number;
554 let from = from_block
555 .map(|num| self.provider().convert_block_number(num))
556 .transpose()?
557 .flatten();
558 let to = to_block
559 .map(|num| self.provider().convert_block_number(num))
560 .transpose()?
561 .flatten();
562
563 if let Some(t) = to &&
565 t > info.best_number
566 {
567 return Err(EthFilterError::BlockRangeExceedsHead);
568 }
569
570 if let Some(f) = from &&
571 f > info.best_number
572 {
573 return Ok(Vec::new());
575 }
576
577 let (from_block_number, to_block_number) =
578 logs_utils::get_filter_block_range(from, to, start_block, info)?;
579
580 let earliest_block = self.provider().earliest_block_number()?;
582 if from_block_number < earliest_block {
583 return Err(EthApiError::PrunedHistoryUnavailable.into());
584 }
585
586 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
587 .await
588 }
589 }
590 }
591
592 async fn install_filter(
594 &self,
595 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
596 ) -> RpcResult<FilterId> {
597 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
598 let subscription_id = self.id_provider.next_id();
599
600 let id = match subscription_id {
601 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
602 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
603 };
604 let mut filters = self.active_filters.inner.lock().await;
605 filters.insert(
606 id.clone(),
607 ActiveFilter {
608 block: last_poll_block_number,
609 last_poll_timestamp: Instant::now(),
610 kind,
611 },
612 );
613 Ok(id)
614 }
615
616 async fn get_logs_in_block_range(
622 self: Arc<Self>,
623 filter: Filter,
624 from_block: u64,
625 to_block: u64,
626 limits: QueryLimits,
627 ) -> Result<Vec<Log>, EthFilterError> {
628 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
629
630 if to_block < from_block {
632 return Err(EthFilterError::InvalidBlockRangeParams)
633 }
634
635 if let Some(max_blocks_per_filter) =
636 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
637 {
638 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
639 }
640
641 let (tx, rx) = oneshot::channel();
642 let this = self.clone();
643 self.task_spawner.spawn_blocking_task(async move {
644 let res =
645 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
646 let _ = tx.send(res);
647 });
648
649 rx.await.map_err(|_| EthFilterError::InternalError)?
650 }
651
652 async fn get_logs_in_block_range_inner(
661 self: Arc<Self>,
662 filter: &Filter,
663 from_block: u64,
664 to_block: u64,
665 limits: QueryLimits,
666 ) -> Result<Vec<Log>, EthFilterError> {
667 let mut all_logs = Vec::new();
668 let mut matching_headers = Vec::new();
669
670 let chain_tip = self.provider().best_block_number()?;
672
673 for (from, to) in
675 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
676 {
677 let headers = self.provider().headers_range(from..=to)?;
678
679 let mut headers_iter = headers.into_iter().peekable();
680
681 while let Some(header) = headers_iter.next() {
682 if !filter.matches_bloom(header.logs_bloom()) {
683 continue
684 }
685
686 let current_number = header.number();
687
688 let block_hash = match headers_iter.peek() {
689 Some(next_header) if next_header.number() == current_number + 1 => {
690 next_header.parent_hash()
692 }
693 _ => {
694 header.hash_slow()
696 }
697 };
698
699 matching_headers.push(SealedHeader::new(header, block_hash));
700 }
701 }
702
703 let mut range_mode = RangeMode::new(
705 self.clone(),
706 matching_headers,
707 from_block,
708 to_block,
709 self.max_headers_range,
710 chain_tip,
711 );
712
713 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
715 range_mode.next().await?
716 {
717 let num_hash = header.num_hash();
718 append_matching_block_logs(
719 &mut all_logs,
720 recovered_block
721 .map(ProviderOrBlock::Block)
722 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
723 filter,
724 num_hash,
725 &receipts,
726 false,
727 header.timestamp(),
728 )?;
729
730 let is_multi_block_range = from_block != to_block;
733 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
734 is_multi_block_range &&
735 all_logs.len() > max_logs_per_response
736 {
737 debug!(
738 target: "rpc::eth::filter",
739 logs_found = all_logs.len(),
740 max_logs_per_response,
741 from_block,
742 to_block = num_hash.number,
743 "Query exceeded max logs per response limit"
744 );
745 return Err(EthFilterError::QueryExceedsMaxResults {
746 max_logs: max_logs_per_response,
747 from_block,
748 to_block: num_hash.number,
749 });
750 }
751 }
752
753 Ok(all_logs)
754 }
755}
756
757#[derive(Debug, Clone, Default)]
759pub struct ActiveFilters<T> {
760 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
761}
762
763impl<T> ActiveFilters<T> {
764 pub fn new() -> Self {
766 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
767 }
768
769 pub async fn contains(&self, id: &FilterId) -> bool {
771 self.inner.lock().await.contains_key(id)
772 }
773
774 pub async fn len(&self) -> usize {
776 self.inner.lock().await.len()
777 }
778
779 pub async fn is_empty(&self) -> bool {
781 self.inner.lock().await.is_empty()
782 }
783
784 pub async fn ids(&self) -> Vec<FilterId> {
786 self.inner.lock().await.keys().cloned().collect()
787 }
788}
789
790#[derive(Debug)]
792struct ActiveFilter<T> {
793 block: u64,
795 last_poll_timestamp: Instant,
797 kind: FilterKind<T>,
799}
800
801#[derive(Debug, Clone)]
803struct PendingTransactionsReceiver {
804 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
805}
806
807impl PendingTransactionsReceiver {
808 fn new(receiver: Receiver<TxHash>) -> Self {
809 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
810 }
811
812 async fn drain<T>(&self) -> FilterChanges<T> {
814 let mut pending_txs = Vec::new();
815 let mut prepared_stream = self.txs_receiver.lock().await;
816
817 while let Ok(tx_hash) = prepared_stream.try_recv() {
818 pending_txs.push(tx_hash);
819 }
820
821 FilterChanges::Hashes(pending_txs)
823 }
824}
825
826#[derive(Debug, Clone)]
828struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
829 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
830 converter: TxCompat,
831}
832
833impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
834where
835 T: PoolTransaction + 'static,
836 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
837{
838 fn new(stream: NewSubpoolTransactionStream<T>, converter: TxCompat) -> Self {
840 Self { txs_stream: Arc::new(Mutex::new(stream)), converter }
841 }
842
843 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
845 let mut pending_txs = Vec::new();
846 let mut prepared_stream = self.txs_stream.lock().await;
847
848 while let Ok(tx) = prepared_stream.try_recv() {
849 match self.converter.fill_pending(tx.transaction.to_consensus()) {
850 Ok(tx) => pending_txs.push(tx),
851 Err(err) => {
852 error!(target: "rpc",
853 %err,
854 "Failed to fill txn with block context"
855 );
856 }
857 }
858 }
859 FilterChanges::Transactions(pending_txs)
860 }
861}
862
863#[async_trait]
865trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
866 async fn drain(&self) -> FilterChanges<T>;
867}
868
869#[async_trait]
870impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
871 for FullTransactionsReceiver<T, TxCompat>
872where
873 T: PoolTransaction + 'static,
874 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
875{
876 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
877 Self::drain(self).await
878 }
879}
880
881#[derive(Debug, Clone)]
887enum PendingTransactionKind<T> {
888 Hashes(PendingTransactionsReceiver),
889 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
890}
891
892impl<T: 'static> PendingTransactionKind<T> {
893 async fn drain(&self) -> FilterChanges<T> {
894 match self {
895 Self::Hashes(receiver) => receiver.drain().await,
896 Self::FullTransaction(receiver) => receiver.drain().await,
897 }
898 }
899}
900
901#[derive(Clone, Debug)]
902enum FilterKind<T> {
903 Log(Box<Filter>),
904 Block,
905 PendingTransaction(PendingTransactionKind<T>),
906}
907
908#[derive(Debug)]
910struct BlockRangeInclusiveIter {
911 iter: StepBy<RangeInclusive<u64>>,
912 step: u64,
913 end: u64,
914}
915
916impl BlockRangeInclusiveIter {
917 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
918 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
919 }
920}
921
922impl Iterator for BlockRangeInclusiveIter {
923 type Item = (u64, u64);
924
925 fn next(&mut self) -> Option<Self::Item> {
926 let start = self.iter.next()?;
927 let end = (start + self.step).min(self.end);
928 if start > end {
929 return None
930 }
931 Some((start, end))
932 }
933}
934
935#[derive(Debug, thiserror::Error)]
937pub enum EthFilterError {
938 #[error("filter not found")]
940 FilterNotFound(FilterId),
941 #[error("invalid block range params")]
943 InvalidBlockRangeParams,
944 #[error("block range extends beyond current head block")]
946 BlockRangeExceedsHead,
947 #[error("query exceeds max block range {0}")]
949 QueryExceedsMaxBlocks(u64),
950 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
952 QueryExceedsMaxResults {
953 max_logs: usize,
955 from_block: u64,
957 to_block: u64,
959 },
960 #[error(transparent)]
962 EthAPIError(#[from] EthApiError),
963 #[error("internal filter error")]
965 InternalError,
966}
967
968impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
969 fn from(err: EthFilterError) -> Self {
970 match err {
971 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
972 jsonrpsee::types::error::INVALID_PARAMS_CODE,
973 "filter not found",
974 ),
975 err @ EthFilterError::InternalError => {
976 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
977 }
978 EthFilterError::EthAPIError(err) => err.into(),
979 err @ (EthFilterError::InvalidBlockRangeParams |
980 EthFilterError::QueryExceedsMaxBlocks(_) |
981 EthFilterError::QueryExceedsMaxResults { .. } |
982 EthFilterError::BlockRangeExceedsHead) => {
983 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
984 }
985 }
986 }
987}
988
989impl From<ProviderError> for EthFilterError {
990 fn from(err: ProviderError) -> Self {
991 Self::EthAPIError(err.into())
992 }
993}
994
995impl From<logs_utils::FilterBlockRangeError> for EthFilterError {
996 fn from(err: logs_utils::FilterBlockRangeError) -> Self {
997 match err {
998 logs_utils::FilterBlockRangeError::InvalidBlockRange => Self::InvalidBlockRangeParams,
999 logs_utils::FilterBlockRangeError::BlockRangeExceedsHead => Self::BlockRangeExceedsHead,
1000 }
1001 }
1002}
1003
1004struct ReceiptBlockResult<P>
1007where
1008 P: ReceiptProvider + BlockReader,
1009{
1010 receipts: Arc<Vec<ProviderReceipt<P>>>,
1012 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
1014 header: SealedHeader<<P as HeaderProvider>::Header>,
1016}
1017
1018enum RangeMode<
1020 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1021 + EthApiTypes
1022 + LoadReceipt
1023 + EthBlocks
1024 + 'static,
1025> {
1026 Cached(CachedMode<Eth>),
1028 Range(RangeBlockMode<Eth>),
1030}
1031
1032impl<
1033 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1034 + EthApiTypes
1035 + LoadReceipt
1036 + EthBlocks
1037 + 'static,
1038 > RangeMode<Eth>
1039{
1040 fn new(
1042 filter_inner: Arc<EthFilterInner<Eth>>,
1043 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1044 from_block: u64,
1045 to_block: u64,
1046 max_headers_range: u64,
1047 chain_tip: u64,
1048 ) -> Self {
1049 let block_count = to_block - from_block + 1;
1050 let distance_from_tip = chain_tip.saturating_sub(to_block);
1051
1052 let use_cached_mode =
1054 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
1055
1056 if use_cached_mode && !sealed_headers.is_empty() {
1057 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
1058 } else {
1059 Self::Range(RangeBlockMode {
1060 filter_inner,
1061 iter: sealed_headers.into_iter().peekable(),
1062 next: VecDeque::new(),
1063 max_range: max_headers_range as usize,
1064 pending_tasks: FuturesOrdered::new(),
1065 })
1066 }
1067 }
1068
1069 const fn should_use_cached_mode(
1071 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
1072 block_count: u64,
1073 distance_from_tip: u64,
1074 ) -> bool {
1075 let bloom_matches = headers.len();
1077
1078 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
1080
1081 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
1082 }
1083
1084 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
1086 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
1088 return CACHED_MODE_BLOCK_THRESHOLD;
1089 }
1090
1091 match bloom_matches {
1092 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
1093 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
1094 _ => CACHED_MODE_BLOCK_THRESHOLD,
1095 }
1096 }
1097
1098 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1100 match self {
1101 Self::Cached(cached) => cached.next().await,
1102 Self::Range(range) => range.next().await,
1103 }
1104 }
1105}
1106
1107struct CachedMode<
1109 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1110 + EthApiTypes
1111 + LoadReceipt
1112 + EthBlocks
1113 + 'static,
1114> {
1115 filter_inner: Arc<EthFilterInner<Eth>>,
1116 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1117}
1118
1119impl<
1120 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1121 + EthApiTypes
1122 + LoadReceipt
1123 + EthBlocks
1124 + 'static,
1125 > CachedMode<Eth>
1126{
1127 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1128 for header in self.headers_iter.by_ref() {
1129 if let Some((receipts, maybe_block)) =
1131 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1132 {
1133 return Ok(Some(ReceiptBlockResult {
1134 receipts,
1135 recovered_block: maybe_block,
1136 header,
1137 }));
1138 }
1139 }
1140
1141 Ok(None) }
1143}
1144
1145type ReceiptFetchFuture<P> =
1147 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1148
1149struct RangeBlockMode<
1151 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1152 + EthApiTypes
1153 + LoadReceipt
1154 + EthBlocks
1155 + 'static,
1156> {
1157 filter_inner: Arc<EthFilterInner<Eth>>,
1158 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1159 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1160 max_range: usize,
1161 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1163}
1164
1165impl<
1166 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
1167 + EthApiTypes
1168 + LoadReceipt
1169 + EthBlocks
1170 + 'static,
1171 > RangeBlockMode<Eth>
1172{
1173 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1174 loop {
1175 if let Some(result) = self.next.pop_front() {
1177 return Ok(Some(result));
1178 }
1179
1180 if let Some(task_result) = self.pending_tasks.next().await {
1182 self.next.extend(task_result?);
1183 continue;
1184 }
1185
1186 let Some(next_header) = self.iter.next() else {
1188 return Ok(None);
1190 };
1191
1192 let mut range_headers = Vec::with_capacity(self.max_range);
1193 range_headers.push(next_header);
1194
1195 while range_headers.len() < self.max_range {
1197 let Some(peeked) = self.iter.peek() else { break };
1198 let Some(last_header) = range_headers.last() else { break };
1199
1200 let expected_next = last_header.number() + 1;
1201 if peeked.number() != expected_next {
1202 trace!(
1203 target: "rpc::eth::filter",
1204 last_block = last_header.number(),
1205 next_block = peeked.number(),
1206 expected = expected_next,
1207 range_size = range_headers.len(),
1208 "Non-consecutive block detected, stopping range collection"
1209 );
1210 break; }
1212
1213 let Some(next_header) = self.iter.next() else { break };
1214 range_headers.push(next_header);
1215 }
1216
1217 let remaining_headers = self.iter.len() + range_headers.len();
1219 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1220 self.spawn_parallel_tasks(range_headers);
1221 } else {
1223 if let Some(result) = self.process_small_range(range_headers).await? {
1225 return Ok(Some(result));
1226 }
1227 }
1229 }
1230 }
1231
1232 async fn process_small_range(
1236 &mut self,
1237 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1238 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1239 for header in range_headers {
1241 let (maybe_block, maybe_receipts) = self
1243 .filter_inner
1244 .eth_cache()
1245 .maybe_cached_block_and_receipts(header.hash())
1246 .await?;
1247
1248 let receipts = match maybe_receipts {
1249 Some(receipts) => receipts,
1250 None => {
1251 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1253 Some(receipts) => Arc::new(receipts),
1254 None => continue, }
1256 }
1257 };
1258
1259 if !receipts.is_empty() {
1260 self.next.push_back(ReceiptBlockResult {
1261 receipts,
1262 recovered_block: maybe_block,
1263 header,
1264 });
1265 }
1266 }
1267
1268 Ok(self.next.pop_front())
1269 }
1270
1271 fn spawn_parallel_tasks(
1276 &mut self,
1277 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1278 ) {
1279 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1281 let header_chunks = range_headers
1282 .into_iter()
1283 .chunks(chunk_size)
1284 .into_iter()
1285 .map(|chunk| chunk.collect::<Vec<_>>())
1286 .collect::<Vec<_>>();
1287
1288 for chunk_headers in header_chunks {
1290 let filter_inner = self.filter_inner.clone();
1291 let chunk_task = Box::pin(async move {
1292 let chunk_task = tokio::task::spawn_blocking(move || {
1293 let mut chunk_results = Vec::with_capacity(chunk_headers.len());
1294
1295 for header in chunk_headers {
1296 let receipts = match filter_inner
1299 .provider()
1300 .receipts_by_block(header.hash().into())?
1301 {
1302 Some(receipts) => Arc::new(receipts),
1303 None => continue, };
1305
1306 if !receipts.is_empty() {
1307 chunk_results.push(ReceiptBlockResult {
1308 receipts,
1309 recovered_block: None,
1310 header,
1311 });
1312 }
1313 }
1314
1315 Ok(chunk_results)
1316 });
1317
1318 match chunk_task.await {
1320 Ok(Ok(chunk_results)) => Ok(chunk_results),
1321 Ok(Err(e)) => Err(e),
1322 Err(join_err) => {
1323 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1324 Err(EthFilterError::InternalError)
1325 }
1326 }
1327 });
1328
1329 self.pending_tasks.push_back(chunk_task);
1330 }
1331 }
1332}
1333
1334#[cfg(test)]
1335mod tests {
1336 use super::*;
1337 use crate::{eth::EthApi, EthApiBuilder};
1338 use alloy_network::Ethereum;
1339 use alloy_primitives::FixedBytes;
1340 use rand::Rng;
1341 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1342 use reth_ethereum_primitives::TxType;
1343 use reth_evm_ethereum::EthEvmConfig;
1344 use reth_network_api::noop::NoopNetwork;
1345 use reth_provider::test_utils::MockEthProvider;
1346 use reth_rpc_convert::RpcConverter;
1347 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1348 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1349 use reth_tasks::Runtime;
1350 use reth_testing_utils::generators;
1351 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1352 use std::{collections::VecDeque, sync::Arc};
1353
1354 #[test]
1355 fn test_block_range_iter() {
1356 let mut rng = generators::rng();
1357
1358 let start = rng.random::<u32>() as u64;
1359 let end = start.saturating_add(rng.random::<u32>() as u64);
1360 let step = rng.random::<u16>() as u64;
1361 let range = start..=end;
1362 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1363 let (from, mut end) = iter.next().unwrap();
1364 assert_eq!(from, start);
1365 assert_eq!(end, (from + step).min(*range.end()));
1366
1367 for (next_from, next_end) in iter {
1368 assert_eq!(next_from, end + 1);
1370 end = next_end;
1371 }
1372
1373 assert_eq!(end, *range.end());
1374 }
1375
1376 #[expect(clippy::type_complexity)]
1378 fn build_test_eth_api(
1379 provider: MockEthProvider,
1380 ) -> EthApi<
1381 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1382 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1383 > {
1384 EthApiBuilder::new(
1385 provider.clone(),
1386 testing_pool(),
1387 NoopNetwork::default(),
1388 EthEvmConfig::new(provider.chain_spec()),
1389 )
1390 .build()
1391 }
1392
1393 #[tokio::test]
1394 async fn test_range_block_mode_empty_range() {
1395 let provider = MockEthProvider::default();
1396 let eth_api = build_test_eth_api(provider);
1397
1398 let eth_filter =
1399 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1400 let filter_inner = eth_filter.inner;
1401
1402 let headers = vec![];
1403 let max_range = 100;
1404
1405 let mut range_mode = RangeBlockMode {
1406 filter_inner,
1407 iter: headers.into_iter().peekable(),
1408 next: VecDeque::new(),
1409 max_range,
1410 pending_tasks: FuturesOrdered::new(),
1411 };
1412
1413 let result = range_mode.next().await;
1414 assert!(result.is_ok());
1415 assert!(result.unwrap().is_none());
1416 }
1417
1418 #[tokio::test]
1419 async fn test_range_block_mode_queued_results_priority() {
1420 let provider = MockEthProvider::default();
1421 let eth_api = build_test_eth_api(provider);
1422
1423 let eth_filter =
1424 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1425 let filter_inner = eth_filter.inner;
1426
1427 let headers = vec![
1428 SealedHeader::new(
1429 alloy_consensus::Header { number: 100, ..Default::default() },
1430 FixedBytes::random(),
1431 ),
1432 SealedHeader::new(
1433 alloy_consensus::Header { number: 101, ..Default::default() },
1434 FixedBytes::random(),
1435 ),
1436 ];
1437
1438 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1440 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1441
1442 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1444 tx_type: TxType::Legacy,
1445 cumulative_gas_used: 100_000,
1446 logs: vec![],
1447 success: true,
1448 };
1449 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1450 tx_type: TxType::Eip1559,
1451 cumulative_gas_used: 200_000,
1452 logs: vec![],
1453 success: true,
1454 };
1455 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1456 tx_type: TxType::Eip2930,
1457 cumulative_gas_used: 150_000,
1458 logs: vec![],
1459 success: false, };
1461
1462 let mock_result_1 = ReceiptBlockResult {
1463 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1464 recovered_block: None,
1465 header: SealedHeader::new(
1466 alloy_consensus::Header { number: 42, ..Default::default() },
1467 expected_block_hash_1,
1468 ),
1469 };
1470
1471 let mock_result_2 = ReceiptBlockResult {
1472 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1473 recovered_block: None,
1474 header: SealedHeader::new(
1475 alloy_consensus::Header { number: 43, ..Default::default() },
1476 expected_block_hash_2,
1477 ),
1478 };
1479
1480 let mut range_mode = RangeBlockMode {
1481 filter_inner,
1482 iter: headers.into_iter().peekable(),
1483 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1485 pending_tasks: FuturesOrdered::new(),
1486 };
1487
1488 let result1 = range_mode.next().await;
1490 assert!(result1.is_ok());
1491 let receipt_result1 = result1.unwrap().unwrap();
1492 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1493 assert_eq!(receipt_result1.header.number, 42);
1494
1495 assert_eq!(receipt_result1.receipts.len(), 2);
1497 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1498 assert_eq!(
1499 receipt_result1.receipts[0].cumulative_gas_used,
1500 mock_receipt_1.cumulative_gas_used
1501 );
1502 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1503 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1504 assert_eq!(
1505 receipt_result1.receipts[1].cumulative_gas_used,
1506 mock_receipt_2.cumulative_gas_used
1507 );
1508 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1509
1510 let result2 = range_mode.next().await;
1512 assert!(result2.is_ok());
1513 let receipt_result2 = result2.unwrap().unwrap();
1514 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1515 assert_eq!(receipt_result2.header.number, 43);
1516
1517 assert_eq!(receipt_result2.receipts.len(), 1);
1519 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1520 assert_eq!(
1521 receipt_result2.receipts[0].cumulative_gas_used,
1522 mock_receipt_3.cumulative_gas_used
1523 );
1524 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1525
1526 assert!(range_mode.next.is_empty());
1528
1529 let result3 = range_mode.next().await;
1530 assert!(result3.is_ok());
1531 }
1532
1533 #[tokio::test]
1534 async fn test_range_block_mode_single_block_no_receipts() {
1535 let provider = MockEthProvider::default();
1536 let eth_api = build_test_eth_api(provider);
1537
1538 let eth_filter =
1539 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1540 let filter_inner = eth_filter.inner;
1541
1542 let headers = vec![SealedHeader::new(
1543 alloy_consensus::Header { number: 100, ..Default::default() },
1544 FixedBytes::random(),
1545 )];
1546
1547 let mut range_mode = RangeBlockMode {
1548 filter_inner,
1549 iter: headers.into_iter().peekable(),
1550 next: VecDeque::new(),
1551 max_range: 100,
1552 pending_tasks: FuturesOrdered::new(),
1553 };
1554
1555 let result = range_mode.next().await;
1556 assert!(result.is_ok());
1557 }
1558
1559 #[tokio::test]
1560 async fn test_range_block_mode_provider_receipts() {
1561 let provider = MockEthProvider::default();
1562
1563 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1564 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1565 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1566
1567 let block_hash_1 = FixedBytes::random();
1568 let block_hash_2 = FixedBytes::random();
1569 let block_hash_3 = FixedBytes::random();
1570
1571 provider.add_header(block_hash_1, header_1.clone());
1572 provider.add_header(block_hash_2, header_2.clone());
1573 provider.add_header(block_hash_3, header_3.clone());
1574
1575 let mock_log = alloy_primitives::Log {
1577 address: alloy_primitives::Address::ZERO,
1578 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1579 };
1580
1581 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1582 tx_type: TxType::Legacy,
1583 cumulative_gas_used: 21_000,
1584 logs: vec![mock_log.clone()],
1585 success: true,
1586 };
1587 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1588 tx_type: TxType::Eip1559,
1589 cumulative_gas_used: 42_000,
1590 logs: vec![mock_log.clone()],
1591 success: true,
1592 };
1593 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1594 tx_type: TxType::Eip2930,
1595 cumulative_gas_used: 30_000,
1596 logs: vec![mock_log.clone()],
1597 success: false,
1598 };
1599
1600 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1601 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1602
1603 let eth_api = build_test_eth_api(provider);
1604
1605 let eth_filter =
1606 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1607 let filter_inner = eth_filter.inner;
1608
1609 let headers = vec![
1610 SealedHeader::new(header_1, block_hash_1),
1611 SealedHeader::new(header_2, block_hash_2),
1612 SealedHeader::new(header_3, block_hash_3),
1613 ];
1614
1615 let mut range_mode = RangeBlockMode {
1616 filter_inner,
1617 iter: headers.into_iter().peekable(),
1618 next: VecDeque::new(),
1619 max_range: 3, pending_tasks: FuturesOrdered::new(),
1621 };
1622
1623 let result = range_mode.next().await;
1625 assert!(result.is_ok());
1626 let receipt_result = result.unwrap().unwrap();
1627
1628 assert_eq!(receipt_result.header.hash(), block_hash_1);
1629 assert_eq!(receipt_result.header.number, 100);
1630 assert_eq!(receipt_result.receipts.len(), 2);
1631
1632 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1634 assert_eq!(
1635 receipt_result.receipts[0].cumulative_gas_used,
1636 receipt_100_1.cumulative_gas_used
1637 );
1638 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1639
1640 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1641 assert_eq!(
1642 receipt_result.receipts[1].cumulative_gas_used,
1643 receipt_100_2.cumulative_gas_used
1644 );
1645 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1646
1647 let result2 = range_mode.next().await;
1649 assert!(result2.is_ok());
1650 let receipt_result2 = result2.unwrap().unwrap();
1651
1652 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1653 assert_eq!(receipt_result2.header.number, 101);
1654 assert_eq!(receipt_result2.receipts.len(), 1);
1655
1656 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1658 assert_eq!(
1659 receipt_result2.receipts[0].cumulative_gas_used,
1660 receipt_101_1.cumulative_gas_used
1661 );
1662 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1663
1664 let result3 = range_mode.next().await;
1666 assert!(result3.is_ok());
1667 assert!(result3.unwrap().is_none());
1668 }
1669
1670 #[tokio::test]
1671 async fn test_range_block_mode_iterator_exhaustion() {
1672 let provider = MockEthProvider::default();
1673
1674 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1675 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1676
1677 let block_hash_100 = FixedBytes::random();
1678 let block_hash_101 = FixedBytes::random();
1679
1680 provider.add_header(block_hash_100, header_100.clone());
1682 provider.add_header(block_hash_101, header_101.clone());
1683
1684 let mock_receipt = reth_ethereum_primitives::Receipt {
1686 tx_type: TxType::Legacy,
1687 cumulative_gas_used: 21_000,
1688 logs: vec![],
1689 success: true,
1690 };
1691 provider.add_receipts(100, vec![mock_receipt.clone()]);
1692 provider.add_receipts(101, vec![mock_receipt.clone()]);
1693
1694 let eth_api = build_test_eth_api(provider);
1695
1696 let eth_filter =
1697 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1698 let filter_inner = eth_filter.inner;
1699
1700 let headers = vec![
1701 SealedHeader::new(header_100, block_hash_100),
1702 SealedHeader::new(header_101, block_hash_101),
1703 ];
1704
1705 let mut range_mode = RangeBlockMode {
1706 filter_inner,
1707 iter: headers.into_iter().peekable(),
1708 next: VecDeque::new(),
1709 max_range: 1,
1710 pending_tasks: FuturesOrdered::new(),
1711 };
1712
1713 let result1 = range_mode.next().await;
1714 assert!(result1.is_ok());
1715 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1720 assert!(result2.is_ok());
1721 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1725
1726 let result3 = range_mode.next().await;
1728 assert!(result3.is_ok());
1729 assert!(result3.unwrap().is_none());
1730 }
1731
1732 #[tokio::test]
1733 async fn test_cached_mode_with_mock_receipts() {
1734 let test_hash = FixedBytes::from([42u8; 32]);
1736 let test_block_number = 100u64;
1737 let test_header = SealedHeader::new(
1738 alloy_consensus::Header {
1739 number: test_block_number,
1740 gas_used: 50_000,
1741 ..Default::default()
1742 },
1743 test_hash,
1744 );
1745
1746 let mock_log = alloy_primitives::Log {
1748 address: alloy_primitives::Address::ZERO,
1749 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1750 };
1751
1752 let mock_receipt = reth_ethereum_primitives::Receipt {
1753 tx_type: TxType::Legacy,
1754 cumulative_gas_used: 21_000,
1755 logs: vec![mock_log],
1756 success: true,
1757 };
1758
1759 let provider = MockEthProvider::default();
1760 provider.add_header(test_hash, test_header.header().clone());
1761 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1762
1763 let eth_api = build_test_eth_api(provider);
1764 let eth_filter =
1765 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1766 let filter_inner = eth_filter.inner;
1767
1768 let headers = vec![test_header.clone()];
1769
1770 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1771
1772 let result = cached_mode.next().await.expect("next should succeed");
1774 let receipt_block_result = result.expect("should have receipt result");
1775 assert_eq!(receipt_block_result.header.hash(), test_hash);
1776 assert_eq!(receipt_block_result.header.number, test_block_number);
1777 assert_eq!(receipt_block_result.receipts.len(), 1);
1778 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1779 assert_eq!(
1780 receipt_block_result.receipts[0].cumulative_gas_used,
1781 mock_receipt.cumulative_gas_used
1782 );
1783 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1784
1785 let result2 = cached_mode.next().await;
1787 assert!(result2.is_ok());
1788 assert!(result2.unwrap().is_none());
1789 }
1790
1791 #[tokio::test]
1792 async fn test_cached_mode_empty_headers() {
1793 let provider = MockEthProvider::default();
1794 let eth_api = build_test_eth_api(provider);
1795
1796 let eth_filter =
1797 super::EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1798 let filter_inner = eth_filter.inner;
1799
1800 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1801
1802 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1803
1804 let result = cached_mode.next().await.expect("next should succeed");
1806 assert!(result.is_none());
1807 }
1808
1809 #[tokio::test]
1810 async fn test_non_consecutive_headers_after_bloom_filter() {
1811 let provider = MockEthProvider::default();
1812
1813 let mut expected_hashes = vec![];
1815 let mut prev_hash = alloy_primitives::B256::default();
1816
1817 use alloy_consensus::TxLegacy;
1819 use reth_ethereum_primitives::{TransactionSigned, TxType};
1820
1821 let tx_inner = TxLegacy {
1822 chain_id: Some(1),
1823 nonce: 0,
1824 gas_price: 21_000,
1825 gas_limit: 21_000,
1826 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1827 value: alloy_primitives::U256::ZERO,
1828 input: alloy_primitives::Bytes::new(),
1829 };
1830 let signature = alloy_primitives::Signature::test_signature();
1831 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1832
1833 for i in 100u64..=103 {
1834 let header = alloy_consensus::Header {
1835 number: i,
1836 parent_hash: prev_hash,
1837 logs_bloom: if i == 100 || i == 102 {
1839 alloy_primitives::Bloom::from([1u8; 256])
1840 } else {
1841 alloy_primitives::Bloom::default()
1842 },
1843 ..Default::default()
1844 };
1845
1846 let hash = header.hash_slow();
1847 expected_hashes.push(hash);
1848 prev_hash = hash;
1849
1850 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1852
1853 let block = reth_ethereum_primitives::Block {
1854 header,
1855 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1856 };
1857 provider.add_block(hash, block);
1858 }
1859
1860 let mock_log = alloy_primitives::Log {
1862 address: alloy_primitives::Address::ZERO,
1863 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1864 };
1865
1866 let receipt = reth_ethereum_primitives::Receipt {
1867 tx_type: TxType::Legacy,
1868 cumulative_gas_used: 21_000,
1869 logs: vec![mock_log],
1870 success: true,
1871 };
1872
1873 provider.add_receipts(100, vec![receipt.clone()]);
1874 provider.add_receipts(101, vec![]);
1875 provider.add_receipts(102, vec![receipt.clone()]);
1876 provider.add_receipts(103, vec![]);
1877
1878 use reth_db_api::models::StoredBlockBodyIndices;
1880 provider
1881 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1882 provider
1883 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1884 provider
1885 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1886 provider
1887 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1888
1889 let eth_api = build_test_eth_api(provider);
1890 let eth_filter = EthFilter::new(eth_api, EthFilterConfig::default(), Runtime::test());
1891
1892 let filter = Filter::default();
1894
1895 let logs = eth_filter
1897 .inner
1898 .clone()
1899 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1900 .await
1901 .expect("should succeed");
1902
1903 assert_eq!(logs.len(), 2);
1905
1906 assert_eq!(logs[0].block_number, Some(100));
1907 assert_eq!(logs[1].block_number, Some(102));
1908
1909 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1913}