1use alloy_consensus::BlockHeader;
4use alloy_primitives::{Sealable, TxHash};
5use alloy_rpc_types_eth::{
6 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7 PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::{
11 future::TryFutureExt,
12 stream::{FuturesOrdered, StreamExt},
13 Future,
14};
15use itertools::Itertools;
16use jsonrpsee::{core::RpcResult, server::IdProvider};
17use reth_errors::ProviderError;
18use reth_primitives_traits::{NodePrimitives, SealedHeader};
19use reth_rpc_eth_api::{
20 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
21 RpcNodeCoreExt, RpcTransaction,
22};
23use reth_rpc_eth_types::{
24 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
25 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
26};
27use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
28use reth_storage_api::{
29 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
30 ProviderReceipt, ReceiptProvider,
31};
32use reth_tasks::TaskSpawner;
33use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
34use std::{
35 collections::{HashMap, VecDeque},
36 fmt,
37 iter::{Peekable, StepBy},
38 ops::RangeInclusive,
39 pin::Pin,
40 sync::Arc,
41 time::{Duration, Instant},
42};
43use tokio::{
44 sync::{mpsc::Receiver, oneshot, Mutex},
45 time::MissedTickBehavior,
46};
47use tracing::{debug, error, trace};
48
49impl<Eth> EngineEthFilter for EthFilter<Eth>
50where
51 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
52{
53 fn logs(
55 &self,
56 filter: Filter,
57 limits: QueryLimits,
58 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
59 trace!(target: "rpc::eth", "Serving eth_getLogs");
60 self.logs_for_filter(filter, limits).map_err(|e| e.into())
61 }
62}
63
64const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
66
67const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
69
70const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
72
73const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
75
76const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
81
82const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
84
85pub struct EthFilter<Eth: EthApiTypes> {
89 inner: Arc<EthFilterInner<Eth>>,
91}
92
93impl<Eth> Clone for EthFilter<Eth>
94where
95 Eth: EthApiTypes,
96{
97 fn clone(&self) -> Self {
98 Self { inner: self.inner.clone() }
99 }
100}
101
102impl<Eth> EthFilter<Eth>
103where
104 Eth: EthApiTypes + 'static,
105{
106 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
134 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
135 config;
136 let inner = EthFilterInner {
137 eth_api,
138 active_filters: ActiveFilters::new(),
139 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
140 max_headers_range: MAX_HEADERS_RANGE,
141 task_spawner,
142 stale_filter_ttl,
143 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
144 };
145
146 let eth_filter = Self { inner: Arc::new(inner) };
147
148 let this = eth_filter.clone();
149 eth_filter.inner.task_spawner.spawn_critical(
150 "eth-filters_stale-filters-clean",
151 Box::pin(async move {
152 this.watch_and_clear_stale_filters().await;
153 }),
154 );
155
156 eth_filter
157 }
158
159 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
161 &self.inner.active_filters
162 }
163
164 async fn watch_and_clear_stale_filters(&self) {
167 let mut interval = tokio::time::interval_at(
168 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
169 self.inner.stale_filter_ttl,
170 );
171 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
172 loop {
173 interval.tick().await;
174 self.clear_stale_filters(Instant::now()).await;
175 }
176 }
177
178 pub async fn clear_stale_filters(&self, now: Instant) {
181 trace!(target: "rpc::eth", "clear stale filters");
182 self.active_filters().inner.lock().await.retain(|id, filter| {
183 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
184
185 if !is_valid {
186 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
187 }
188
189 is_valid
190 })
191 }
192}
193
194impl<Eth> EthFilter<Eth>
195where
196 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
197{
198 fn provider(&self) -> &Eth::Provider {
200 self.inner.eth_api.provider()
201 }
202
203 fn pool(&self) -> &Eth::Pool {
205 self.inner.eth_api.pool()
206 }
207
208 pub async fn filter_changes(
210 &self,
211 id: FilterId,
212 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
213 let info = self.provider().chain_info()?;
214 let best_number = info.best_number;
215
216 let (start_block, kind) = {
219 let mut filters = self.inner.active_filters.inner.lock().await;
220 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
221
222 if filter.block > best_number {
223 return Ok(FilterChanges::Empty)
225 }
226
227 let mut block = best_number + 1;
231 std::mem::swap(&mut filter.block, &mut block);
232 filter.last_poll_timestamp = Instant::now();
233
234 (block, filter.kind.clone())
235 };
236
237 match kind {
238 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
239 FilterKind::Block => {
240 let end_block = best_number + 1;
243 let block_hashes =
244 self.provider().canonical_hashes_range(start_block, end_block).map_err(
245 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
246 )?;
247 Ok(FilterChanges::Hashes(block_hashes))
248 }
249 FilterKind::Log(filter) => {
250 let (from_block_number, to_block_number) = match filter.block_option {
251 FilterBlockOption::Range { from_block, to_block } => {
252 let from = from_block
253 .map(|num| self.provider().convert_block_number(num))
254 .transpose()?
255 .flatten();
256 let to = to_block
257 .map(|num| self.provider().convert_block_number(num))
258 .transpose()?
259 .flatten();
260 logs_utils::get_filter_block_range(from, to, start_block, info)
261 }
262 FilterBlockOption::AtBlockHash(_) => {
263 (start_block, best_number)
267 }
268 };
269 let logs = self
270 .inner
271 .clone()
272 .get_logs_in_block_range(
273 *filter,
274 from_block_number,
275 to_block_number,
276 self.inner.query_limits,
277 )
278 .await?;
279 Ok(FilterChanges::Logs(logs))
280 }
281 }
282 }
283
284 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
290 let filter = {
291 let filters = self.inner.active_filters.inner.lock().await;
292 if let FilterKind::Log(ref filter) =
293 filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
294 {
295 *filter.clone()
296 } else {
297 return Err(EthFilterError::FilterNotFound(id))
299 }
300 };
301
302 self.logs_for_filter(filter, self.inner.query_limits).await
303 }
304
305 async fn logs_for_filter(
307 &self,
308 filter: Filter,
309 limits: QueryLimits,
310 ) -> Result<Vec<Log>, EthFilterError> {
311 self.inner.clone().logs_for_filter(filter, limits).await
312 }
313}
314
315#[async_trait]
316impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
317where
318 Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
319{
320 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
322 trace!(target: "rpc::eth", "Serving eth_newFilter");
323 self.inner
324 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
325 .await
326 }
327
328 async fn new_block_filter(&self) -> RpcResult<FilterId> {
330 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
331 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
332 }
333
334 async fn new_pending_transaction_filter(
336 &self,
337 kind: Option<PendingTransactionFilterKind>,
338 ) -> RpcResult<FilterId> {
339 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
340
341 let transaction_kind = match kind.unwrap_or_default() {
342 PendingTransactionFilterKind::Hashes => {
343 let receiver = self.pool().pending_transactions_listener();
344 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
345 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
346 }
347 PendingTransactionFilterKind::Full => {
348 let stream = self.pool().new_pending_pool_transactions_listener();
349 let full_txs_receiver = FullTransactionsReceiver::new(
350 stream,
351 dyn_clone::clone(self.inner.eth_api.tx_resp_builder()),
352 );
353 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
354 full_txs_receiver,
355 )))
356 }
357 };
358
359 self.inner.install_filter(transaction_kind).await
363 }
364
365 async fn filter_changes(
367 &self,
368 id: FilterId,
369 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
370 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
371 Ok(Self::filter_changes(self, id).await?)
372 }
373
374 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
380 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
381 Ok(Self::filter_logs(self, id).await?)
382 }
383
384 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
386 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
387 let mut filters = self.inner.active_filters.inner.lock().await;
388 if filters.remove(&id).is_some() {
389 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
390 Ok(true)
391 } else {
392 Ok(false)
393 }
394 }
395
396 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
400 trace!(target: "rpc::eth", "Serving eth_getLogs");
401 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
402 }
403}
404
405impl<Eth> std::fmt::Debug for EthFilter<Eth>
406where
407 Eth: EthApiTypes,
408{
409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410 f.debug_struct("EthFilter").finish_non_exhaustive()
411 }
412}
413
414#[derive(Debug)]
416struct EthFilterInner<Eth: EthApiTypes> {
417 eth_api: Eth,
419 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
421 id_provider: Arc<dyn IdProvider>,
423 query_limits: QueryLimits,
425 max_headers_range: u64,
427 task_spawner: Box<dyn TaskSpawner>,
429 stale_filter_ttl: Duration,
431}
432
433impl<Eth> EthFilterInner<Eth>
434where
435 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
436 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
437 + 'static,
438{
439 fn provider(&self) -> &Eth::Provider {
441 self.eth_api.provider()
442 }
443
444 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
446 self.eth_api.cache()
447 }
448
449 async fn logs_for_filter(
451 self: Arc<Self>,
452 filter: Filter,
453 limits: QueryLimits,
454 ) -> Result<Vec<Log>, EthFilterError> {
455 match filter.block_option {
456 FilterBlockOption::AtBlockHash(block_hash) => {
457 let header = self
460 .provider()
461 .header_by_hash_or_number(block_hash.into())?
462 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
463
464 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
465
466 let (receipts, maybe_block) = self
469 .eth_cache()
470 .get_receipts_and_maybe_block(block_num_hash.hash)
471 .await?
472 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
473
474 let mut all_logs = Vec::new();
475 append_matching_block_logs(
476 &mut all_logs,
477 maybe_block
478 .map(ProviderOrBlock::Block)
479 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
480 &filter,
481 block_num_hash,
482 &receipts,
483 false,
484 header.timestamp(),
485 )?;
486
487 Ok(all_logs)
488 }
489 FilterBlockOption::Range { from_block, to_block } => {
490 let info = self.provider().chain_info()?;
492
493 let start_block = info.best_number;
495 let from = from_block
496 .map(|num| self.provider().convert_block_number(num))
497 .transpose()?
498 .flatten();
499 let to = to_block
500 .map(|num| self.provider().convert_block_number(num))
501 .transpose()?
502 .flatten();
503
504 if let Some(f) = from &&
505 f > info.best_number
506 {
507 return Ok(Vec::new());
509 }
510
511 let (from_block_number, to_block_number) =
512 logs_utils::get_filter_block_range(from, to, start_block, info);
513
514 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
515 .await
516 }
517 }
518 }
519
520 async fn install_filter(
522 &self,
523 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
524 ) -> RpcResult<FilterId> {
525 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
526 let subscription_id = self.id_provider.next_id();
527
528 let id = match subscription_id {
529 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
530 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
531 };
532 let mut filters = self.active_filters.inner.lock().await;
533 filters.insert(
534 id.clone(),
535 ActiveFilter {
536 block: last_poll_block_number,
537 last_poll_timestamp: Instant::now(),
538 kind,
539 },
540 );
541 Ok(id)
542 }
543
544 async fn get_logs_in_block_range(
550 self: Arc<Self>,
551 filter: Filter,
552 from_block: u64,
553 to_block: u64,
554 limits: QueryLimits,
555 ) -> Result<Vec<Log>, EthFilterError> {
556 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
557
558 if to_block < from_block {
560 return Err(EthFilterError::InvalidBlockRangeParams)
561 }
562
563 if let Some(max_blocks_per_filter) =
564 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
565 {
566 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
567 }
568
569 let (tx, rx) = oneshot::channel();
570 let this = self.clone();
571 self.task_spawner.spawn_blocking(Box::pin(async move {
572 let res =
573 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
574 let _ = tx.send(res);
575 }));
576
577 rx.await.map_err(|_| EthFilterError::InternalError)?
578 }
579
580 async fn get_logs_in_block_range_inner(
589 self: Arc<Self>,
590 filter: &Filter,
591 from_block: u64,
592 to_block: u64,
593 limits: QueryLimits,
594 ) -> Result<Vec<Log>, EthFilterError> {
595 let mut all_logs = Vec::new();
596 let mut matching_headers = Vec::new();
597
598 let chain_tip = self.provider().best_block_number()?;
600
601 for (from, to) in
603 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
604 {
605 let headers = self.provider().headers_range(from..=to)?;
606
607 let mut headers_iter = headers.into_iter().peekable();
608
609 while let Some(header) = headers_iter.next() {
610 if !filter.matches_bloom(header.logs_bloom()) {
611 continue
612 }
613
614 let current_number = header.number();
615
616 let block_hash = match headers_iter.peek() {
617 Some(next_header) if next_header.number() == current_number + 1 => {
618 next_header.parent_hash()
620 }
621 _ => {
622 header.hash_slow()
624 }
625 };
626
627 matching_headers.push(SealedHeader::new(header, block_hash));
628 }
629 }
630
631 let mut range_mode = RangeMode::new(
633 self.clone(),
634 matching_headers,
635 from_block,
636 to_block,
637 self.max_headers_range,
638 chain_tip,
639 );
640
641 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
643 range_mode.next().await?
644 {
645 let num_hash = header.num_hash();
646 append_matching_block_logs(
647 &mut all_logs,
648 recovered_block
649 .map(ProviderOrBlock::Block)
650 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
651 filter,
652 num_hash,
653 &receipts,
654 false,
655 header.timestamp(),
656 )?;
657
658 let is_multi_block_range = from_block != to_block;
661 if let Some(max_logs_per_response) = limits.max_logs_per_response &&
662 is_multi_block_range &&
663 all_logs.len() > max_logs_per_response
664 {
665 debug!(
666 target: "rpc::eth::filter",
667 logs_found = all_logs.len(),
668 max_logs_per_response,
669 from_block,
670 to_block = num_hash.number.saturating_sub(1),
671 "Query exceeded max logs per response limit"
672 );
673 return Err(EthFilterError::QueryExceedsMaxResults {
674 max_logs: max_logs_per_response,
675 from_block,
676 to_block: num_hash.number.saturating_sub(1),
677 });
678 }
679 }
680
681 Ok(all_logs)
682 }
683}
684
685#[derive(Debug, Clone, Default)]
687pub struct ActiveFilters<T> {
688 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
689}
690
691impl<T> ActiveFilters<T> {
692 pub fn new() -> Self {
694 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
695 }
696}
697
698#[derive(Debug)]
700struct ActiveFilter<T> {
701 block: u64,
703 last_poll_timestamp: Instant,
705 kind: FilterKind<T>,
707}
708
709#[derive(Debug, Clone)]
711struct PendingTransactionsReceiver {
712 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
713}
714
715impl PendingTransactionsReceiver {
716 fn new(receiver: Receiver<TxHash>) -> Self {
717 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
718 }
719
720 async fn drain<T>(&self) -> FilterChanges<T> {
722 let mut pending_txs = Vec::new();
723 let mut prepared_stream = self.txs_receiver.lock().await;
724
725 while let Ok(tx_hash) = prepared_stream.try_recv() {
726 pending_txs.push(tx_hash);
727 }
728
729 FilterChanges::Hashes(pending_txs)
731 }
732}
733
734#[derive(Debug, Clone)]
736struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
737 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
738 tx_resp_builder: TxCompat,
739}
740
741impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
742where
743 T: PoolTransaction + 'static,
744 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
745{
746 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
748 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
749 }
750
751 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
753 let mut pending_txs = Vec::new();
754 let mut prepared_stream = self.txs_stream.lock().await;
755
756 while let Ok(tx) = prepared_stream.try_recv() {
757 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
758 Ok(tx) => pending_txs.push(tx),
759 Err(err) => {
760 error!(target: "rpc",
761 %err,
762 "Failed to fill txn with block context"
763 );
764 }
765 }
766 }
767 FilterChanges::Transactions(pending_txs)
768 }
769}
770
771#[async_trait]
773trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
774 async fn drain(&self) -> FilterChanges<T>;
775}
776
777#[async_trait]
778impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
779 for FullTransactionsReceiver<T, TxCompat>
780where
781 T: PoolTransaction + 'static,
782 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
783{
784 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
785 Self::drain(self).await
786 }
787}
788
789#[derive(Debug, Clone)]
795enum PendingTransactionKind<T> {
796 Hashes(PendingTransactionsReceiver),
797 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
798}
799
800impl<T: 'static> PendingTransactionKind<T> {
801 async fn drain(&self) -> FilterChanges<T> {
802 match self {
803 Self::Hashes(receiver) => receiver.drain().await,
804 Self::FullTransaction(receiver) => receiver.drain().await,
805 }
806 }
807}
808
809#[derive(Clone, Debug)]
810enum FilterKind<T> {
811 Log(Box<Filter>),
812 Block,
813 PendingTransaction(PendingTransactionKind<T>),
814}
815
816#[derive(Debug)]
818struct BlockRangeInclusiveIter {
819 iter: StepBy<RangeInclusive<u64>>,
820 step: u64,
821 end: u64,
822}
823
824impl BlockRangeInclusiveIter {
825 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
826 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
827 }
828}
829
830impl Iterator for BlockRangeInclusiveIter {
831 type Item = (u64, u64);
832
833 fn next(&mut self) -> Option<Self::Item> {
834 let start = self.iter.next()?;
835 let end = (start + self.step).min(self.end);
836 if start > end {
837 return None
838 }
839 Some((start, end))
840 }
841}
842
843#[derive(Debug, thiserror::Error)]
845pub enum EthFilterError {
846 #[error("filter not found")]
848 FilterNotFound(FilterId),
849 #[error("invalid block range params")]
851 InvalidBlockRangeParams,
852 #[error("query exceeds max block range {0}")]
854 QueryExceedsMaxBlocks(u64),
855 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
857 QueryExceedsMaxResults {
858 max_logs: usize,
860 from_block: u64,
862 to_block: u64,
864 },
865 #[error(transparent)]
867 EthAPIError(#[from] EthApiError),
868 #[error("internal filter error")]
870 InternalError,
871}
872
873impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
874 fn from(err: EthFilterError) -> Self {
875 match err {
876 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
877 jsonrpsee::types::error::INVALID_PARAMS_CODE,
878 "filter not found",
879 ),
880 err @ EthFilterError::InternalError => {
881 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
882 }
883 EthFilterError::EthAPIError(err) => err.into(),
884 err @ (EthFilterError::InvalidBlockRangeParams |
885 EthFilterError::QueryExceedsMaxBlocks(_) |
886 EthFilterError::QueryExceedsMaxResults { .. }) => {
887 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
888 }
889 }
890 }
891}
892
893impl From<ProviderError> for EthFilterError {
894 fn from(err: ProviderError) -> Self {
895 Self::EthAPIError(err.into())
896 }
897}
898
899struct ReceiptBlockResult<P>
902where
903 P: ReceiptProvider + BlockReader,
904{
905 receipts: Arc<Vec<ProviderReceipt<P>>>,
907 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
909 header: SealedHeader<<P as HeaderProvider>::Header>,
911}
912
913enum RangeMode<
915 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
916> {
917 Cached(CachedMode<Eth>),
919 Range(RangeBlockMode<Eth>),
921}
922
923impl<
924 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
925 > RangeMode<Eth>
926{
927 fn new(
929 filter_inner: Arc<EthFilterInner<Eth>>,
930 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
931 from_block: u64,
932 to_block: u64,
933 max_headers_range: u64,
934 chain_tip: u64,
935 ) -> Self {
936 let block_count = to_block - from_block + 1;
937 let distance_from_tip = chain_tip.saturating_sub(to_block);
938
939 let use_cached_mode =
941 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
942
943 if use_cached_mode && !sealed_headers.is_empty() {
944 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
945 } else {
946 Self::Range(RangeBlockMode {
947 filter_inner,
948 iter: sealed_headers.into_iter().peekable(),
949 next: VecDeque::new(),
950 max_range: max_headers_range as usize,
951 pending_tasks: FuturesOrdered::new(),
952 })
953 }
954 }
955
956 const fn should_use_cached_mode(
958 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
959 block_count: u64,
960 distance_from_tip: u64,
961 ) -> bool {
962 let bloom_matches = headers.len();
964
965 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
967
968 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
969 }
970
971 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
973 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
975 return CACHED_MODE_BLOCK_THRESHOLD;
976 }
977
978 match bloom_matches {
979 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
980 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
981 _ => CACHED_MODE_BLOCK_THRESHOLD,
982 }
983 }
984
985 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
987 match self {
988 Self::Cached(cached) => cached.next().await,
989 Self::Range(range) => range.next().await,
990 }
991 }
992}
993
994struct CachedMode<
996 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
997> {
998 filter_inner: Arc<EthFilterInner<Eth>>,
999 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1000}
1001
1002impl<
1003 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1004 > CachedMode<Eth>
1005{
1006 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1007 for header in self.headers_iter.by_ref() {
1008 if let Some((receipts, maybe_block)) =
1010 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1011 {
1012 return Ok(Some(ReceiptBlockResult {
1013 receipts,
1014 recovered_block: maybe_block,
1015 header,
1016 }));
1017 }
1018 }
1019
1020 Ok(None) }
1022}
1023
1024type ReceiptFetchFuture<P> =
1026 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1027
1028struct RangeBlockMode<
1030 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1031> {
1032 filter_inner: Arc<EthFilterInner<Eth>>,
1033 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1034 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1035 max_range: usize,
1036 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1038}
1039
1040impl<
1041 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1042 > RangeBlockMode<Eth>
1043{
1044 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1045 loop {
1046 if let Some(result) = self.next.pop_front() {
1048 return Ok(Some(result));
1049 }
1050
1051 if let Some(task_result) = self.pending_tasks.next().await {
1053 self.next.extend(task_result?);
1054 continue;
1055 }
1056
1057 let Some(next_header) = self.iter.next() else {
1059 return Ok(None);
1061 };
1062
1063 let mut range_headers = Vec::with_capacity(self.max_range);
1064 range_headers.push(next_header);
1065
1066 while range_headers.len() < self.max_range {
1068 let Some(peeked) = self.iter.peek() else { break };
1069 let Some(last_header) = range_headers.last() else { break };
1070
1071 let expected_next = last_header.number() + 1;
1072 if peeked.number() != expected_next {
1073 debug!(
1074 target: "rpc::eth::filter",
1075 last_block = last_header.number(),
1076 next_block = peeked.number(),
1077 expected = expected_next,
1078 range_size = range_headers.len(),
1079 "Non-consecutive block detected, stopping range collection"
1080 );
1081 break; }
1083
1084 let Some(next_header) = self.iter.next() else { break };
1085 range_headers.push(next_header);
1086 }
1087
1088 let remaining_headers = self.iter.len() + range_headers.len();
1090 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1091 self.spawn_parallel_tasks(range_headers);
1092 } else {
1094 if let Some(result) = self.process_small_range(range_headers).await? {
1096 return Ok(Some(result));
1097 }
1098 }
1100 }
1101 }
1102
1103 async fn process_small_range(
1107 &mut self,
1108 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1109 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1110 for header in range_headers {
1112 let (maybe_block, maybe_receipts) = self
1114 .filter_inner
1115 .eth_cache()
1116 .maybe_cached_block_and_receipts(header.hash())
1117 .await?;
1118
1119 let receipts = match maybe_receipts {
1120 Some(receipts) => receipts,
1121 None => {
1122 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1124 Some(receipts) => Arc::new(receipts),
1125 None => continue, }
1127 }
1128 };
1129
1130 if !receipts.is_empty() {
1131 self.next.push_back(ReceiptBlockResult {
1132 receipts,
1133 recovered_block: maybe_block,
1134 header,
1135 });
1136 }
1137 }
1138
1139 Ok(self.next.pop_front())
1140 }
1141
1142 fn spawn_parallel_tasks(
1147 &mut self,
1148 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1149 ) {
1150 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1152 let header_chunks = range_headers
1153 .into_iter()
1154 .chunks(chunk_size)
1155 .into_iter()
1156 .map(|chunk| chunk.collect::<Vec<_>>())
1157 .collect::<Vec<_>>();
1158
1159 for chunk_headers in header_chunks {
1161 let filter_inner = self.filter_inner.clone();
1162 let chunk_task = Box::pin(async move {
1163 let chunk_task = tokio::task::spawn_blocking(move || {
1164 let mut chunk_results = Vec::new();
1165
1166 for header in chunk_headers {
1167 let receipts = match filter_inner
1170 .provider()
1171 .receipts_by_block(header.hash().into())?
1172 {
1173 Some(receipts) => Arc::new(receipts),
1174 None => continue, };
1176
1177 if !receipts.is_empty() {
1178 chunk_results.push(ReceiptBlockResult {
1179 receipts,
1180 recovered_block: None,
1181 header,
1182 });
1183 }
1184 }
1185
1186 Ok(chunk_results)
1187 });
1188
1189 match chunk_task.await {
1191 Ok(Ok(chunk_results)) => Ok(chunk_results),
1192 Ok(Err(e)) => Err(e),
1193 Err(join_err) => {
1194 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1195 Err(EthFilterError::InternalError)
1196 }
1197 }
1198 });
1199
1200 self.pending_tasks.push_back(chunk_task);
1201 }
1202 }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use super::*;
1208 use crate::{eth::EthApi, EthApiBuilder};
1209 use alloy_network::Ethereum;
1210 use alloy_primitives::FixedBytes;
1211 use rand::Rng;
1212 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1213 use reth_ethereum_primitives::TxType;
1214 use reth_evm_ethereum::EthEvmConfig;
1215 use reth_network_api::noop::NoopNetwork;
1216 use reth_provider::test_utils::MockEthProvider;
1217 use reth_rpc_convert::RpcConverter;
1218 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1219 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1220 use reth_tasks::TokioTaskExecutor;
1221 use reth_testing_utils::generators;
1222 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1223 use std::{collections::VecDeque, sync::Arc};
1224
1225 #[test]
1226 fn test_block_range_iter() {
1227 let mut rng = generators::rng();
1228
1229 let start = rng.random::<u32>() as u64;
1230 let end = start.saturating_add(rng.random::<u32>() as u64);
1231 let step = rng.random::<u16>() as u64;
1232 let range = start..=end;
1233 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1234 let (from, mut end) = iter.next().unwrap();
1235 assert_eq!(from, start);
1236 assert_eq!(end, (from + step).min(*range.end()));
1237
1238 for (next_from, next_end) in iter {
1239 assert_eq!(next_from, end + 1);
1241 end = next_end;
1242 }
1243
1244 assert_eq!(end, *range.end());
1245 }
1246
1247 #[expect(clippy::type_complexity)]
1249 fn build_test_eth_api(
1250 provider: MockEthProvider,
1251 ) -> EthApi<
1252 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1253 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1254 > {
1255 EthApiBuilder::new(
1256 provider.clone(),
1257 testing_pool(),
1258 NoopNetwork::default(),
1259 EthEvmConfig::new(provider.chain_spec()),
1260 )
1261 .build()
1262 }
1263
1264 #[tokio::test]
1265 async fn test_range_block_mode_empty_range() {
1266 let provider = MockEthProvider::default();
1267 let eth_api = build_test_eth_api(provider);
1268
1269 let eth_filter = super::EthFilter::new(
1270 eth_api,
1271 EthFilterConfig::default(),
1272 Box::new(TokioTaskExecutor::default()),
1273 );
1274 let filter_inner = eth_filter.inner;
1275
1276 let headers = vec![];
1277 let max_range = 100;
1278
1279 let mut range_mode = RangeBlockMode {
1280 filter_inner,
1281 iter: headers.into_iter().peekable(),
1282 next: VecDeque::new(),
1283 max_range,
1284 pending_tasks: FuturesOrdered::new(),
1285 };
1286
1287 let result = range_mode.next().await;
1288 assert!(result.is_ok());
1289 assert!(result.unwrap().is_none());
1290 }
1291
1292 #[tokio::test]
1293 async fn test_range_block_mode_queued_results_priority() {
1294 let provider = MockEthProvider::default();
1295 let eth_api = build_test_eth_api(provider);
1296
1297 let eth_filter = super::EthFilter::new(
1298 eth_api,
1299 EthFilterConfig::default(),
1300 Box::new(TokioTaskExecutor::default()),
1301 );
1302 let filter_inner = eth_filter.inner;
1303
1304 let headers = vec![
1305 SealedHeader::new(
1306 alloy_consensus::Header { number: 100, ..Default::default() },
1307 FixedBytes::random(),
1308 ),
1309 SealedHeader::new(
1310 alloy_consensus::Header { number: 101, ..Default::default() },
1311 FixedBytes::random(),
1312 ),
1313 ];
1314
1315 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1317 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1318
1319 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1321 tx_type: TxType::Legacy,
1322 cumulative_gas_used: 100_000,
1323 logs: vec![],
1324 success: true,
1325 };
1326 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1327 tx_type: TxType::Eip1559,
1328 cumulative_gas_used: 200_000,
1329 logs: vec![],
1330 success: true,
1331 };
1332 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1333 tx_type: TxType::Eip2930,
1334 cumulative_gas_used: 150_000,
1335 logs: vec![],
1336 success: false, };
1338
1339 let mock_result_1 = ReceiptBlockResult {
1340 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1341 recovered_block: None,
1342 header: SealedHeader::new(
1343 alloy_consensus::Header { number: 42, ..Default::default() },
1344 expected_block_hash_1,
1345 ),
1346 };
1347
1348 let mock_result_2 = ReceiptBlockResult {
1349 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1350 recovered_block: None,
1351 header: SealedHeader::new(
1352 alloy_consensus::Header { number: 43, ..Default::default() },
1353 expected_block_hash_2,
1354 ),
1355 };
1356
1357 let mut range_mode = RangeBlockMode {
1358 filter_inner,
1359 iter: headers.into_iter().peekable(),
1360 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1362 pending_tasks: FuturesOrdered::new(),
1363 };
1364
1365 let result1 = range_mode.next().await;
1367 assert!(result1.is_ok());
1368 let receipt_result1 = result1.unwrap().unwrap();
1369 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1370 assert_eq!(receipt_result1.header.number, 42);
1371
1372 assert_eq!(receipt_result1.receipts.len(), 2);
1374 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1375 assert_eq!(
1376 receipt_result1.receipts[0].cumulative_gas_used,
1377 mock_receipt_1.cumulative_gas_used
1378 );
1379 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1380 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1381 assert_eq!(
1382 receipt_result1.receipts[1].cumulative_gas_used,
1383 mock_receipt_2.cumulative_gas_used
1384 );
1385 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1386
1387 let result2 = range_mode.next().await;
1389 assert!(result2.is_ok());
1390 let receipt_result2 = result2.unwrap().unwrap();
1391 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1392 assert_eq!(receipt_result2.header.number, 43);
1393
1394 assert_eq!(receipt_result2.receipts.len(), 1);
1396 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1397 assert_eq!(
1398 receipt_result2.receipts[0].cumulative_gas_used,
1399 mock_receipt_3.cumulative_gas_used
1400 );
1401 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1402
1403 assert!(range_mode.next.is_empty());
1405
1406 let result3 = range_mode.next().await;
1407 assert!(result3.is_ok());
1408 }
1409
1410 #[tokio::test]
1411 async fn test_range_block_mode_single_block_no_receipts() {
1412 let provider = MockEthProvider::default();
1413 let eth_api = build_test_eth_api(provider);
1414
1415 let eth_filter = super::EthFilter::new(
1416 eth_api,
1417 EthFilterConfig::default(),
1418 Box::new(TokioTaskExecutor::default()),
1419 );
1420 let filter_inner = eth_filter.inner;
1421
1422 let headers = vec![SealedHeader::new(
1423 alloy_consensus::Header { number: 100, ..Default::default() },
1424 FixedBytes::random(),
1425 )];
1426
1427 let mut range_mode = RangeBlockMode {
1428 filter_inner,
1429 iter: headers.into_iter().peekable(),
1430 next: VecDeque::new(),
1431 max_range: 100,
1432 pending_tasks: FuturesOrdered::new(),
1433 };
1434
1435 let result = range_mode.next().await;
1436 assert!(result.is_ok());
1437 }
1438
1439 #[tokio::test]
1440 async fn test_range_block_mode_provider_receipts() {
1441 let provider = MockEthProvider::default();
1442
1443 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1444 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1445 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1446
1447 let block_hash_1 = FixedBytes::random();
1448 let block_hash_2 = FixedBytes::random();
1449 let block_hash_3 = FixedBytes::random();
1450
1451 provider.add_header(block_hash_1, header_1.clone());
1452 provider.add_header(block_hash_2, header_2.clone());
1453 provider.add_header(block_hash_3, header_3.clone());
1454
1455 let mock_log = alloy_primitives::Log {
1457 address: alloy_primitives::Address::ZERO,
1458 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1459 };
1460
1461 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1462 tx_type: TxType::Legacy,
1463 cumulative_gas_used: 21_000,
1464 logs: vec![mock_log.clone()],
1465 success: true,
1466 };
1467 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1468 tx_type: TxType::Eip1559,
1469 cumulative_gas_used: 42_000,
1470 logs: vec![mock_log.clone()],
1471 success: true,
1472 };
1473 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1474 tx_type: TxType::Eip2930,
1475 cumulative_gas_used: 30_000,
1476 logs: vec![mock_log.clone()],
1477 success: false,
1478 };
1479
1480 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1481 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1482
1483 let eth_api = build_test_eth_api(provider);
1484
1485 let eth_filter = super::EthFilter::new(
1486 eth_api,
1487 EthFilterConfig::default(),
1488 Box::new(TokioTaskExecutor::default()),
1489 );
1490 let filter_inner = eth_filter.inner;
1491
1492 let headers = vec![
1493 SealedHeader::new(header_1, block_hash_1),
1494 SealedHeader::new(header_2, block_hash_2),
1495 SealedHeader::new(header_3, block_hash_3),
1496 ];
1497
1498 let mut range_mode = RangeBlockMode {
1499 filter_inner,
1500 iter: headers.into_iter().peekable(),
1501 next: VecDeque::new(),
1502 max_range: 3, pending_tasks: FuturesOrdered::new(),
1504 };
1505
1506 let result = range_mode.next().await;
1508 assert!(result.is_ok());
1509 let receipt_result = result.unwrap().unwrap();
1510
1511 assert_eq!(receipt_result.header.hash(), block_hash_1);
1512 assert_eq!(receipt_result.header.number, 100);
1513 assert_eq!(receipt_result.receipts.len(), 2);
1514
1515 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1517 assert_eq!(
1518 receipt_result.receipts[0].cumulative_gas_used,
1519 receipt_100_1.cumulative_gas_used
1520 );
1521 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1522
1523 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1524 assert_eq!(
1525 receipt_result.receipts[1].cumulative_gas_used,
1526 receipt_100_2.cumulative_gas_used
1527 );
1528 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1529
1530 let result2 = range_mode.next().await;
1532 assert!(result2.is_ok());
1533 let receipt_result2 = result2.unwrap().unwrap();
1534
1535 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1536 assert_eq!(receipt_result2.header.number, 101);
1537 assert_eq!(receipt_result2.receipts.len(), 1);
1538
1539 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1541 assert_eq!(
1542 receipt_result2.receipts[0].cumulative_gas_used,
1543 receipt_101_1.cumulative_gas_used
1544 );
1545 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1546
1547 let result3 = range_mode.next().await;
1549 assert!(result3.is_ok());
1550 assert!(result3.unwrap().is_none());
1551 }
1552
1553 #[tokio::test]
1554 async fn test_range_block_mode_iterator_exhaustion() {
1555 let provider = MockEthProvider::default();
1556
1557 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1558 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1559
1560 let block_hash_100 = FixedBytes::random();
1561 let block_hash_101 = FixedBytes::random();
1562
1563 provider.add_header(block_hash_100, header_100.clone());
1565 provider.add_header(block_hash_101, header_101.clone());
1566
1567 let mock_receipt = reth_ethereum_primitives::Receipt {
1569 tx_type: TxType::Legacy,
1570 cumulative_gas_used: 21_000,
1571 logs: vec![],
1572 success: true,
1573 };
1574 provider.add_receipts(100, vec![mock_receipt.clone()]);
1575 provider.add_receipts(101, vec![mock_receipt.clone()]);
1576
1577 let eth_api = build_test_eth_api(provider);
1578
1579 let eth_filter = super::EthFilter::new(
1580 eth_api,
1581 EthFilterConfig::default(),
1582 Box::new(TokioTaskExecutor::default()),
1583 );
1584 let filter_inner = eth_filter.inner;
1585
1586 let headers = vec![
1587 SealedHeader::new(header_100, block_hash_100),
1588 SealedHeader::new(header_101, block_hash_101),
1589 ];
1590
1591 let mut range_mode = RangeBlockMode {
1592 filter_inner,
1593 iter: headers.into_iter().peekable(),
1594 next: VecDeque::new(),
1595 max_range: 1,
1596 pending_tasks: FuturesOrdered::new(),
1597 };
1598
1599 let result1 = range_mode.next().await;
1600 assert!(result1.is_ok());
1601 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1606 assert!(result2.is_ok());
1607 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1611
1612 let result3 = range_mode.next().await;
1614 assert!(result3.is_ok());
1615 assert!(result3.unwrap().is_none());
1616 }
1617
1618 #[tokio::test]
1619 async fn test_cached_mode_with_mock_receipts() {
1620 let test_hash = FixedBytes::from([42u8; 32]);
1622 let test_block_number = 100u64;
1623 let test_header = SealedHeader::new(
1624 alloy_consensus::Header {
1625 number: test_block_number,
1626 gas_used: 50_000,
1627 ..Default::default()
1628 },
1629 test_hash,
1630 );
1631
1632 let mock_log = alloy_primitives::Log {
1634 address: alloy_primitives::Address::ZERO,
1635 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1636 };
1637
1638 let mock_receipt = reth_ethereum_primitives::Receipt {
1639 tx_type: TxType::Legacy,
1640 cumulative_gas_used: 21_000,
1641 logs: vec![mock_log],
1642 success: true,
1643 };
1644
1645 let provider = MockEthProvider::default();
1646 provider.add_header(test_hash, test_header.header().clone());
1647 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1648
1649 let eth_api = build_test_eth_api(provider);
1650 let eth_filter = super::EthFilter::new(
1651 eth_api,
1652 EthFilterConfig::default(),
1653 Box::new(TokioTaskExecutor::default()),
1654 );
1655 let filter_inner = eth_filter.inner;
1656
1657 let headers = vec![test_header.clone()];
1658
1659 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1660
1661 let result = cached_mode.next().await.expect("next should succeed");
1663 let receipt_block_result = result.expect("should have receipt result");
1664 assert_eq!(receipt_block_result.header.hash(), test_hash);
1665 assert_eq!(receipt_block_result.header.number, test_block_number);
1666 assert_eq!(receipt_block_result.receipts.len(), 1);
1667 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1668 assert_eq!(
1669 receipt_block_result.receipts[0].cumulative_gas_used,
1670 mock_receipt.cumulative_gas_used
1671 );
1672 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1673
1674 let result2 = cached_mode.next().await;
1676 assert!(result2.is_ok());
1677 assert!(result2.unwrap().is_none());
1678 }
1679
1680 #[tokio::test]
1681 async fn test_cached_mode_empty_headers() {
1682 let provider = MockEthProvider::default();
1683 let eth_api = build_test_eth_api(provider);
1684
1685 let eth_filter = super::EthFilter::new(
1686 eth_api,
1687 EthFilterConfig::default(),
1688 Box::new(TokioTaskExecutor::default()),
1689 );
1690 let filter_inner = eth_filter.inner;
1691
1692 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1693
1694 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1695
1696 let result = cached_mode.next().await.expect("next should succeed");
1698 assert!(result.is_none());
1699 }
1700
1701 #[tokio::test]
1702 async fn test_non_consecutive_headers_after_bloom_filter() {
1703 let provider = MockEthProvider::default();
1704
1705 let mut expected_hashes = vec![];
1707 let mut prev_hash = alloy_primitives::B256::default();
1708
1709 use alloy_consensus::TxLegacy;
1711 use reth_ethereum_primitives::{TransactionSigned, TxType};
1712
1713 let tx_inner = TxLegacy {
1714 chain_id: Some(1),
1715 nonce: 0,
1716 gas_price: 21_000,
1717 gas_limit: 21_000,
1718 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1719 value: alloy_primitives::U256::ZERO,
1720 input: alloy_primitives::Bytes::new(),
1721 };
1722 let signature = alloy_primitives::Signature::test_signature();
1723 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1724
1725 for i in 100u64..=103 {
1726 let header = alloy_consensus::Header {
1727 number: i,
1728 parent_hash: prev_hash,
1729 logs_bloom: if i == 100 || i == 102 {
1731 alloy_primitives::Bloom::from([1u8; 256])
1732 } else {
1733 alloy_primitives::Bloom::default()
1734 },
1735 ..Default::default()
1736 };
1737
1738 let hash = header.hash_slow();
1739 expected_hashes.push(hash);
1740 prev_hash = hash;
1741
1742 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1744
1745 let block = reth_ethereum_primitives::Block {
1746 header,
1747 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1748 };
1749 provider.add_block(hash, block);
1750 }
1751
1752 let mock_log = alloy_primitives::Log {
1754 address: alloy_primitives::Address::ZERO,
1755 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1756 };
1757
1758 let receipt = reth_ethereum_primitives::Receipt {
1759 tx_type: TxType::Legacy,
1760 cumulative_gas_used: 21_000,
1761 logs: vec![mock_log],
1762 success: true,
1763 };
1764
1765 provider.add_receipts(100, vec![receipt.clone()]);
1766 provider.add_receipts(101, vec![]);
1767 provider.add_receipts(102, vec![receipt.clone()]);
1768 provider.add_receipts(103, vec![]);
1769
1770 use reth_db_api::models::StoredBlockBodyIndices;
1772 provider
1773 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1774 provider
1775 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1776 provider
1777 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1778 provider
1779 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1780
1781 let eth_api = build_test_eth_api(provider);
1782 let eth_filter = EthFilter::new(
1783 eth_api,
1784 EthFilterConfig::default(),
1785 Box::new(TokioTaskExecutor::default()),
1786 );
1787
1788 let filter = Filter::default();
1790
1791 let logs = eth_filter
1793 .inner
1794 .clone()
1795 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1796 .await
1797 .expect("should succeed");
1798
1799 assert_eq!(logs.len(), 2);
1801
1802 assert_eq!(logs[0].block_number, Some(100));
1803 assert_eq!(logs[1].block_number, Some(102));
1804
1805 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1809}