1use alloy_consensus::BlockHeader;
4use alloy_primitives::{Sealable, TxHash};
5use alloy_rpc_types_eth::{
6 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7 PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::{
11 future::TryFutureExt,
12 stream::{FuturesOrdered, StreamExt},
13 Future,
14};
15use itertools::Itertools;
16use jsonrpsee::{core::RpcResult, server::IdProvider};
17use reth_errors::ProviderError;
18use reth_primitives_traits::{NodePrimitives, SealedHeader};
19use reth_rpc_eth_api::{
20 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
21 RpcNodeCoreExt, RpcTransaction,
22};
23use reth_rpc_eth_types::{
24 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
25 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
26};
27use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
28use reth_storage_api::{
29 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
30 ProviderReceipt, ReceiptProvider,
31};
32use reth_tasks::TaskSpawner;
33use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
34use std::{
35 collections::{HashMap, VecDeque},
36 fmt,
37 iter::{Peekable, StepBy},
38 ops::RangeInclusive,
39 pin::Pin,
40 sync::Arc,
41 time::{Duration, Instant},
42};
43use tokio::{
44 sync::{mpsc::Receiver, oneshot, Mutex},
45 time::MissedTickBehavior,
46};
47use tracing::{debug, error, trace};
48
49impl<Eth> EngineEthFilter for EthFilter<Eth>
50where
51 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
52{
53 fn logs(
55 &self,
56 filter: Filter,
57 limits: QueryLimits,
58 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
59 trace!(target: "rpc::eth", "Serving eth_getLogs");
60 self.logs_for_filter(filter, limits).map_err(|e| e.into())
61 }
62}
63
64const CACHED_MODE_BLOCK_THRESHOLD: u64 = 250;
66
67const HIGH_BLOOM_MATCH_THRESHOLD: usize = 20;
69
70const MODERATE_BLOOM_MATCH_THRESHOLD: usize = 10;
72
73const BLOOM_ADJUSTMENT_MIN_BLOCKS: u64 = 100;
75
76const MAX_HEADERS_RANGE: u64 = 1_000; const PARALLEL_PROCESSING_THRESHOLD: usize = 1000;
81
82const DEFAULT_PARALLEL_CONCURRENCY: usize = 4;
84
85pub struct EthFilter<Eth: EthApiTypes> {
89 inner: Arc<EthFilterInner<Eth>>,
91}
92
93impl<Eth> Clone for EthFilter<Eth>
94where
95 Eth: EthApiTypes,
96{
97 fn clone(&self) -> Self {
98 Self { inner: self.inner.clone() }
99 }
100}
101
102impl<Eth> EthFilter<Eth>
103where
104 Eth: EthApiTypes + 'static,
105{
106 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
134 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
135 config;
136 let inner = EthFilterInner {
137 eth_api,
138 active_filters: ActiveFilters::new(),
139 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
140 max_headers_range: MAX_HEADERS_RANGE,
141 task_spawner,
142 stale_filter_ttl,
143 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
144 };
145
146 let eth_filter = Self { inner: Arc::new(inner) };
147
148 let this = eth_filter.clone();
149 eth_filter.inner.task_spawner.spawn_critical(
150 "eth-filters_stale-filters-clean",
151 Box::pin(async move {
152 this.watch_and_clear_stale_filters().await;
153 }),
154 );
155
156 eth_filter
157 }
158
159 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
161 &self.inner.active_filters
162 }
163
164 async fn watch_and_clear_stale_filters(&self) {
167 let mut interval = tokio::time::interval_at(
168 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
169 self.inner.stale_filter_ttl,
170 );
171 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
172 loop {
173 interval.tick().await;
174 self.clear_stale_filters(Instant::now()).await;
175 }
176 }
177
178 pub async fn clear_stale_filters(&self, now: Instant) {
181 trace!(target: "rpc::eth", "clear stale filters");
182 self.active_filters().inner.lock().await.retain(|id, filter| {
183 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
184
185 if !is_valid {
186 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
187 }
188
189 is_valid
190 })
191 }
192}
193
194impl<Eth> EthFilter<Eth>
195where
196 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
197{
198 fn provider(&self) -> &Eth::Provider {
200 self.inner.eth_api.provider()
201 }
202
203 fn pool(&self) -> &Eth::Pool {
205 self.inner.eth_api.pool()
206 }
207
208 pub async fn filter_changes(
210 &self,
211 id: FilterId,
212 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
213 let info = self.provider().chain_info()?;
214 let best_number = info.best_number;
215
216 let (start_block, kind) = {
219 let mut filters = self.inner.active_filters.inner.lock().await;
220 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
221
222 if filter.block > best_number {
223 return Ok(FilterChanges::Empty)
225 }
226
227 let mut block = best_number + 1;
231 std::mem::swap(&mut filter.block, &mut block);
232 filter.last_poll_timestamp = Instant::now();
233
234 (block, filter.kind.clone())
235 };
236
237 match kind {
238 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
239 FilterKind::Block => {
240 let end_block = best_number + 1;
243 let block_hashes =
244 self.provider().canonical_hashes_range(start_block, end_block).map_err(
245 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
246 )?;
247 Ok(FilterChanges::Hashes(block_hashes))
248 }
249 FilterKind::Log(filter) => {
250 let (from_block_number, to_block_number) = match filter.block_option {
251 FilterBlockOption::Range { from_block, to_block } => {
252 let from = from_block
253 .map(|num| self.provider().convert_block_number(num))
254 .transpose()?
255 .flatten();
256 let to = to_block
257 .map(|num| self.provider().convert_block_number(num))
258 .transpose()?
259 .flatten();
260 logs_utils::get_filter_block_range(from, to, start_block, info)
261 }
262 FilterBlockOption::AtBlockHash(_) => {
263 (start_block, best_number)
267 }
268 };
269 let logs = self
270 .inner
271 .clone()
272 .get_logs_in_block_range(
273 *filter,
274 from_block_number,
275 to_block_number,
276 self.inner.query_limits,
277 )
278 .await?;
279 Ok(FilterChanges::Logs(logs))
280 }
281 }
282 }
283
284 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
290 let filter = {
291 let filters = self.inner.active_filters.inner.lock().await;
292 if let FilterKind::Log(ref filter) =
293 filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
294 {
295 *filter.clone()
296 } else {
297 return Err(EthFilterError::FilterNotFound(id))
299 }
300 };
301
302 self.logs_for_filter(filter, self.inner.query_limits).await
303 }
304
305 async fn logs_for_filter(
307 &self,
308 filter: Filter,
309 limits: QueryLimits,
310 ) -> Result<Vec<Log>, EthFilterError> {
311 self.inner.clone().logs_for_filter(filter, limits).await
312 }
313}
314
315#[async_trait]
316impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
317where
318 Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
319{
320 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
322 trace!(target: "rpc::eth", "Serving eth_newFilter");
323 self.inner
324 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
325 .await
326 }
327
328 async fn new_block_filter(&self) -> RpcResult<FilterId> {
330 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
331 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
332 }
333
334 async fn new_pending_transaction_filter(
336 &self,
337 kind: Option<PendingTransactionFilterKind>,
338 ) -> RpcResult<FilterId> {
339 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
340
341 let transaction_kind = match kind.unwrap_or_default() {
342 PendingTransactionFilterKind::Hashes => {
343 let receiver = self.pool().pending_transactions_listener();
344 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
345 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
346 }
347 PendingTransactionFilterKind::Full => {
348 let stream = self.pool().new_pending_pool_transactions_listener();
349 let full_txs_receiver = FullTransactionsReceiver::new(
350 stream,
351 dyn_clone::clone(self.inner.eth_api.tx_resp_builder()),
352 );
353 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
354 full_txs_receiver,
355 )))
356 }
357 };
358
359 self.inner.install_filter(transaction_kind).await
363 }
364
365 async fn filter_changes(
367 &self,
368 id: FilterId,
369 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
370 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
371 Ok(Self::filter_changes(self, id).await?)
372 }
373
374 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
380 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
381 Ok(Self::filter_logs(self, id).await?)
382 }
383
384 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
386 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
387 let mut filters = self.inner.active_filters.inner.lock().await;
388 if filters.remove(&id).is_some() {
389 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
390 Ok(true)
391 } else {
392 Ok(false)
393 }
394 }
395
396 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
400 trace!(target: "rpc::eth", "Serving eth_getLogs");
401 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
402 }
403}
404
405impl<Eth> std::fmt::Debug for EthFilter<Eth>
406where
407 Eth: EthApiTypes,
408{
409 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410 f.debug_struct("EthFilter").finish_non_exhaustive()
411 }
412}
413
414#[derive(Debug)]
416struct EthFilterInner<Eth: EthApiTypes> {
417 eth_api: Eth,
419 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
421 id_provider: Arc<dyn IdProvider>,
423 query_limits: QueryLimits,
425 max_headers_range: u64,
427 task_spawner: Box<dyn TaskSpawner>,
429 stale_filter_ttl: Duration,
431}
432
433impl<Eth> EthFilterInner<Eth>
434where
435 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
436 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
437 + 'static,
438{
439 fn provider(&self) -> &Eth::Provider {
441 self.eth_api.provider()
442 }
443
444 fn eth_cache(&self) -> &EthStateCache<Eth::Primitives> {
446 self.eth_api.cache()
447 }
448
449 async fn logs_for_filter(
451 self: Arc<Self>,
452 filter: Filter,
453 limits: QueryLimits,
454 ) -> Result<Vec<Log>, EthFilterError> {
455 match filter.block_option {
456 FilterBlockOption::AtBlockHash(block_hash) => {
457 let header = self
460 .provider()
461 .header_by_hash_or_number(block_hash.into())?
462 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
463
464 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
465
466 let (receipts, maybe_block) = self
469 .eth_cache()
470 .get_receipts_and_maybe_block(block_num_hash.hash)
471 .await?
472 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
473
474 let mut all_logs = Vec::new();
475 append_matching_block_logs(
476 &mut all_logs,
477 maybe_block
478 .map(ProviderOrBlock::Block)
479 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
480 &filter,
481 block_num_hash,
482 &receipts,
483 false,
484 header.timestamp(),
485 )?;
486
487 Ok(all_logs)
488 }
489 FilterBlockOption::Range { from_block, to_block } => {
490 let info = self.provider().chain_info()?;
492
493 let start_block = info.best_number;
495 let from = from_block
496 .map(|num| self.provider().convert_block_number(num))
497 .transpose()?
498 .flatten();
499 let to = to_block
500 .map(|num| self.provider().convert_block_number(num))
501 .transpose()?
502 .flatten();
503
504 if let Some(f) = from {
505 if f > info.best_number {
506 return Ok(Vec::new());
508 }
509 }
510
511 let (from_block_number, to_block_number) =
512 logs_utils::get_filter_block_range(from, to, start_block, info);
513
514 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
515 .await
516 }
517 }
518 }
519
520 async fn install_filter(
522 &self,
523 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
524 ) -> RpcResult<FilterId> {
525 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
526 let subscription_id = self.id_provider.next_id();
527
528 let id = match subscription_id {
529 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
530 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
531 };
532 let mut filters = self.active_filters.inner.lock().await;
533 filters.insert(
534 id.clone(),
535 ActiveFilter {
536 block: last_poll_block_number,
537 last_poll_timestamp: Instant::now(),
538 kind,
539 },
540 );
541 Ok(id)
542 }
543
544 async fn get_logs_in_block_range(
550 self: Arc<Self>,
551 filter: Filter,
552 from_block: u64,
553 to_block: u64,
554 limits: QueryLimits,
555 ) -> Result<Vec<Log>, EthFilterError> {
556 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
557
558 if to_block < from_block {
560 return Err(EthFilterError::InvalidBlockRangeParams)
561 }
562
563 if let Some(max_blocks_per_filter) =
564 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
565 {
566 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
567 }
568
569 let (tx, rx) = oneshot::channel();
570 let this = self.clone();
571 self.task_spawner.spawn_blocking(Box::pin(async move {
572 let res =
573 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
574 let _ = tx.send(res);
575 }));
576
577 rx.await.map_err(|_| EthFilterError::InternalError)?
578 }
579
580 async fn get_logs_in_block_range_inner(
589 self: Arc<Self>,
590 filter: &Filter,
591 from_block: u64,
592 to_block: u64,
593 limits: QueryLimits,
594 ) -> Result<Vec<Log>, EthFilterError> {
595 let mut all_logs = Vec::new();
596 let mut matching_headers = Vec::new();
597
598 let chain_tip = self.provider().best_block_number()?;
600
601 for (from, to) in
603 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
604 {
605 let headers = self.provider().headers_range(from..=to)?;
606
607 let mut headers_iter = headers.into_iter().peekable();
608
609 while let Some(header) = headers_iter.next() {
610 if !filter.matches_bloom(header.logs_bloom()) {
611 continue
612 }
613
614 let current_number = header.number();
615
616 let block_hash = match headers_iter.peek() {
617 Some(next_header) if next_header.number() == current_number + 1 => {
618 next_header.parent_hash()
620 }
621 _ => {
622 header.hash_slow()
624 }
625 };
626
627 matching_headers.push(SealedHeader::new(header, block_hash));
628 }
629 }
630
631 let mut range_mode = RangeMode::new(
633 self.clone(),
634 matching_headers,
635 from_block,
636 to_block,
637 self.max_headers_range,
638 chain_tip,
639 );
640
641 while let Some(ReceiptBlockResult { receipts, recovered_block, header }) =
643 range_mode.next().await?
644 {
645 let num_hash = header.num_hash();
646 append_matching_block_logs(
647 &mut all_logs,
648 recovered_block
649 .map(ProviderOrBlock::Block)
650 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
651 filter,
652 num_hash,
653 &receipts,
654 false,
655 header.timestamp(),
656 )?;
657
658 let is_multi_block_range = from_block != to_block;
661 if let Some(max_logs_per_response) = limits.max_logs_per_response {
662 if is_multi_block_range && all_logs.len() > max_logs_per_response {
663 debug!(
664 target: "rpc::eth::filter",
665 logs_found = all_logs.len(),
666 max_logs_per_response,
667 from_block,
668 to_block = num_hash.number.saturating_sub(1),
669 "Query exceeded max logs per response limit"
670 );
671 return Err(EthFilterError::QueryExceedsMaxResults {
672 max_logs: max_logs_per_response,
673 from_block,
674 to_block: num_hash.number.saturating_sub(1),
675 });
676 }
677 }
678 }
679
680 Ok(all_logs)
681 }
682}
683
684#[derive(Debug, Clone, Default)]
686pub struct ActiveFilters<T> {
687 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
688}
689
690impl<T> ActiveFilters<T> {
691 pub fn new() -> Self {
693 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
694 }
695}
696
697#[derive(Debug)]
699struct ActiveFilter<T> {
700 block: u64,
702 last_poll_timestamp: Instant,
704 kind: FilterKind<T>,
706}
707
708#[derive(Debug, Clone)]
710struct PendingTransactionsReceiver {
711 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
712}
713
714impl PendingTransactionsReceiver {
715 fn new(receiver: Receiver<TxHash>) -> Self {
716 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
717 }
718
719 async fn drain<T>(&self) -> FilterChanges<T> {
721 let mut pending_txs = Vec::new();
722 let mut prepared_stream = self.txs_receiver.lock().await;
723
724 while let Ok(tx_hash) = prepared_stream.try_recv() {
725 pending_txs.push(tx_hash);
726 }
727
728 FilterChanges::Hashes(pending_txs)
730 }
731}
732
733#[derive(Debug, Clone)]
735struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
736 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
737 tx_resp_builder: TxCompat,
738}
739
740impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
741where
742 T: PoolTransaction + 'static,
743 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
744{
745 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
747 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
748 }
749
750 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
752 let mut pending_txs = Vec::new();
753 let mut prepared_stream = self.txs_stream.lock().await;
754
755 while let Ok(tx) = prepared_stream.try_recv() {
756 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
757 Ok(tx) => pending_txs.push(tx),
758 Err(err) => {
759 error!(target: "rpc",
760 %err,
761 "Failed to fill txn with block context"
762 );
763 }
764 }
765 }
766 FilterChanges::Transactions(pending_txs)
767 }
768}
769
770#[async_trait]
772trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
773 async fn drain(&self) -> FilterChanges<T>;
774}
775
776#[async_trait]
777impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
778 for FullTransactionsReceiver<T, TxCompat>
779where
780 T: PoolTransaction + 'static,
781 TxCompat: RpcConvert<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
782{
783 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
784 Self::drain(self).await
785 }
786}
787
788#[derive(Debug, Clone)]
794enum PendingTransactionKind<T> {
795 Hashes(PendingTransactionsReceiver),
796 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
797}
798
799impl<T: 'static> PendingTransactionKind<T> {
800 async fn drain(&self) -> FilterChanges<T> {
801 match self {
802 Self::Hashes(receiver) => receiver.drain().await,
803 Self::FullTransaction(receiver) => receiver.drain().await,
804 }
805 }
806}
807
808#[derive(Clone, Debug)]
809enum FilterKind<T> {
810 Log(Box<Filter>),
811 Block,
812 PendingTransaction(PendingTransactionKind<T>),
813}
814
815#[derive(Debug)]
817struct BlockRangeInclusiveIter {
818 iter: StepBy<RangeInclusive<u64>>,
819 step: u64,
820 end: u64,
821}
822
823impl BlockRangeInclusiveIter {
824 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
825 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
826 }
827}
828
829impl Iterator for BlockRangeInclusiveIter {
830 type Item = (u64, u64);
831
832 fn next(&mut self) -> Option<Self::Item> {
833 let start = self.iter.next()?;
834 let end = (start + self.step).min(self.end);
835 if start > end {
836 return None
837 }
838 Some((start, end))
839 }
840}
841
842#[derive(Debug, thiserror::Error)]
844pub enum EthFilterError {
845 #[error("filter not found")]
847 FilterNotFound(FilterId),
848 #[error("invalid block range params")]
850 InvalidBlockRangeParams,
851 #[error("query exceeds max block range {0}")]
853 QueryExceedsMaxBlocks(u64),
854 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
856 QueryExceedsMaxResults {
857 max_logs: usize,
859 from_block: u64,
861 to_block: u64,
863 },
864 #[error(transparent)]
866 EthAPIError(#[from] EthApiError),
867 #[error("internal filter error")]
869 InternalError,
870}
871
872impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
873 fn from(err: EthFilterError) -> Self {
874 match err {
875 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
876 jsonrpsee::types::error::INVALID_PARAMS_CODE,
877 "filter not found",
878 ),
879 err @ EthFilterError::InternalError => {
880 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
881 }
882 EthFilterError::EthAPIError(err) => err.into(),
883 err @ (EthFilterError::InvalidBlockRangeParams |
884 EthFilterError::QueryExceedsMaxBlocks(_) |
885 EthFilterError::QueryExceedsMaxResults { .. }) => {
886 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
887 }
888 }
889 }
890}
891
892impl From<ProviderError> for EthFilterError {
893 fn from(err: ProviderError) -> Self {
894 Self::EthAPIError(err.into())
895 }
896}
897
898struct ReceiptBlockResult<P>
901where
902 P: ReceiptProvider + BlockReader,
903{
904 receipts: Arc<Vec<ProviderReceipt<P>>>,
906 recovered_block: Option<Arc<reth_primitives_traits::RecoveredBlock<ProviderBlock<P>>>>,
908 header: SealedHeader<<P as HeaderProvider>::Header>,
910}
911
912enum RangeMode<
914 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
915> {
916 Cached(CachedMode<Eth>),
918 Range(RangeBlockMode<Eth>),
920}
921
922impl<
923 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
924 > RangeMode<Eth>
925{
926 fn new(
928 filter_inner: Arc<EthFilterInner<Eth>>,
929 sealed_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
930 from_block: u64,
931 to_block: u64,
932 max_headers_range: u64,
933 chain_tip: u64,
934 ) -> Self {
935 let block_count = to_block - from_block + 1;
936 let distance_from_tip = chain_tip.saturating_sub(to_block);
937
938 let use_cached_mode =
940 Self::should_use_cached_mode(&sealed_headers, block_count, distance_from_tip);
941
942 if use_cached_mode && !sealed_headers.is_empty() {
943 Self::Cached(CachedMode { filter_inner, headers_iter: sealed_headers.into_iter() })
944 } else {
945 Self::Range(RangeBlockMode {
946 filter_inner,
947 iter: sealed_headers.into_iter().peekable(),
948 next: VecDeque::new(),
949 max_range: max_headers_range as usize,
950 pending_tasks: FuturesOrdered::new(),
951 })
952 }
953 }
954
955 const fn should_use_cached_mode(
957 headers: &[SealedHeader<<Eth::Provider as HeaderProvider>::Header>],
958 block_count: u64,
959 distance_from_tip: u64,
960 ) -> bool {
961 let bloom_matches = headers.len();
963
964 let adjusted_threshold = Self::calculate_adjusted_threshold(block_count, bloom_matches);
966
967 block_count <= adjusted_threshold && distance_from_tip <= adjusted_threshold
968 }
969
970 const fn calculate_adjusted_threshold(block_count: u64, bloom_matches: usize) -> u64 {
972 if block_count <= BLOOM_ADJUSTMENT_MIN_BLOCKS {
974 return CACHED_MODE_BLOCK_THRESHOLD;
975 }
976
977 match bloom_matches {
978 n if n > HIGH_BLOOM_MATCH_THRESHOLD => CACHED_MODE_BLOCK_THRESHOLD / 2,
979 n if n > MODERATE_BLOOM_MATCH_THRESHOLD => (CACHED_MODE_BLOCK_THRESHOLD * 3) / 4,
980 _ => CACHED_MODE_BLOCK_THRESHOLD,
981 }
982 }
983
984 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
986 match self {
987 Self::Cached(cached) => cached.next().await,
988 Self::Range(range) => range.next().await,
989 }
990 }
991}
992
993struct CachedMode<
995 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
996> {
997 filter_inner: Arc<EthFilterInner<Eth>>,
998 headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
999}
1000
1001impl<
1002 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1003 > CachedMode<Eth>
1004{
1005 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1006 for header in self.headers_iter.by_ref() {
1007 if let Some((receipts, maybe_block)) =
1009 self.filter_inner.eth_cache().get_receipts_and_maybe_block(header.hash()).await?
1010 {
1011 return Ok(Some(ReceiptBlockResult {
1012 receipts,
1013 recovered_block: maybe_block,
1014 header,
1015 }));
1016 }
1017 }
1018
1019 Ok(None) }
1021}
1022
1023type ReceiptFetchFuture<P> =
1025 Pin<Box<dyn Future<Output = Result<Vec<ReceiptBlockResult<P>>, EthFilterError>> + Send>>;
1026
1027struct RangeBlockMode<
1029 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1030> {
1031 filter_inner: Arc<EthFilterInner<Eth>>,
1032 iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
1033 next: VecDeque<ReceiptBlockResult<Eth::Provider>>,
1034 max_range: usize,
1035 pending_tasks: FuturesOrdered<ReceiptFetchFuture<Eth::Provider>>,
1037}
1038
1039impl<
1040 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
1041 > RangeBlockMode<Eth>
1042{
1043 async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1044 loop {
1045 if let Some(result) = self.next.pop_front() {
1047 return Ok(Some(result));
1048 }
1049
1050 if let Some(task_result) = self.pending_tasks.next().await {
1052 self.next.extend(task_result?);
1053 continue;
1054 }
1055
1056 let Some(next_header) = self.iter.next() else {
1058 return Ok(None);
1060 };
1061
1062 let mut range_headers = Vec::with_capacity(self.max_range);
1063 range_headers.push(next_header);
1064
1065 while range_headers.len() < self.max_range {
1067 let Some(peeked) = self.iter.peek() else { break };
1068 let Some(last_header) = range_headers.last() else { break };
1069
1070 let expected_next = last_header.number() + 1;
1071 if peeked.number() != expected_next {
1072 debug!(
1073 target: "rpc::eth::filter",
1074 last_block = last_header.number(),
1075 next_block = peeked.number(),
1076 expected = expected_next,
1077 range_size = range_headers.len(),
1078 "Non-consecutive block detected, stopping range collection"
1079 );
1080 break; }
1082
1083 let Some(next_header) = self.iter.next() else { break };
1084 range_headers.push(next_header);
1085 }
1086
1087 let remaining_headers = self.iter.len() + range_headers.len();
1089 if remaining_headers >= PARALLEL_PROCESSING_THRESHOLD {
1090 self.spawn_parallel_tasks(range_headers);
1091 } else {
1093 if let Some(result) = self.process_small_range(range_headers).await? {
1095 return Ok(Some(result));
1096 }
1097 }
1099 }
1100 }
1101
1102 async fn process_small_range(
1106 &mut self,
1107 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1108 ) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
1109 for header in range_headers {
1111 let (maybe_block, maybe_receipts) = self
1113 .filter_inner
1114 .eth_cache()
1115 .maybe_cached_block_and_receipts(header.hash())
1116 .await?;
1117
1118 let receipts = match maybe_receipts {
1119 Some(receipts) => receipts,
1120 None => {
1121 match self.filter_inner.provider().receipts_by_block(header.hash().into())? {
1123 Some(receipts) => Arc::new(receipts),
1124 None => continue, }
1126 }
1127 };
1128
1129 if !receipts.is_empty() {
1130 self.next.push_back(ReceiptBlockResult {
1131 receipts,
1132 recovered_block: maybe_block,
1133 header,
1134 });
1135 }
1136 }
1137
1138 Ok(self.next.pop_front())
1139 }
1140
1141 fn spawn_parallel_tasks(
1146 &mut self,
1147 range_headers: Vec<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
1148 ) {
1149 let chunk_size = std::cmp::max(range_headers.len() / DEFAULT_PARALLEL_CONCURRENCY, 1);
1151 let header_chunks = range_headers
1152 .into_iter()
1153 .chunks(chunk_size)
1154 .into_iter()
1155 .map(|chunk| chunk.collect::<Vec<_>>())
1156 .collect::<Vec<_>>();
1157
1158 for chunk_headers in header_chunks {
1160 let filter_inner = self.filter_inner.clone();
1161 let chunk_task = Box::pin(async move {
1162 let chunk_task = tokio::task::spawn_blocking(move || {
1163 let mut chunk_results = Vec::new();
1164
1165 for header in chunk_headers {
1166 let receipts = match filter_inner
1169 .provider()
1170 .receipts_by_block(header.hash().into())?
1171 {
1172 Some(receipts) => Arc::new(receipts),
1173 None => continue, };
1175
1176 if !receipts.is_empty() {
1177 chunk_results.push(ReceiptBlockResult {
1178 receipts,
1179 recovered_block: None,
1180 header,
1181 });
1182 }
1183 }
1184
1185 Ok(chunk_results)
1186 });
1187
1188 match chunk_task.await {
1190 Ok(Ok(chunk_results)) => Ok(chunk_results),
1191 Ok(Err(e)) => Err(e),
1192 Err(join_err) => {
1193 trace!(target: "rpc::eth::filter", error = ?join_err, "Task join error");
1194 Err(EthFilterError::InternalError)
1195 }
1196 }
1197 });
1198
1199 self.pending_tasks.push_back(chunk_task);
1200 }
1201 }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use super::*;
1207 use crate::{eth::EthApi, EthApiBuilder};
1208 use alloy_network::Ethereum;
1209 use alloy_primitives::FixedBytes;
1210 use rand::Rng;
1211 use reth_chainspec::{ChainSpec, ChainSpecProvider};
1212 use reth_ethereum_primitives::TxType;
1213 use reth_evm_ethereum::EthEvmConfig;
1214 use reth_network_api::noop::NoopNetwork;
1215 use reth_provider::test_utils::MockEthProvider;
1216 use reth_rpc_convert::RpcConverter;
1217 use reth_rpc_eth_api::node::RpcNodeCoreAdapter;
1218 use reth_rpc_eth_types::receipt::EthReceiptConverter;
1219 use reth_tasks::TokioTaskExecutor;
1220 use reth_testing_utils::generators;
1221 use reth_transaction_pool::test_utils::{testing_pool, TestPool};
1222 use std::{collections::VecDeque, sync::Arc};
1223
1224 #[test]
1225 fn test_block_range_iter() {
1226 let mut rng = generators::rng();
1227
1228 let start = rng.random::<u32>() as u64;
1229 let end = start.saturating_add(rng.random::<u32>() as u64);
1230 let step = rng.random::<u16>() as u64;
1231 let range = start..=end;
1232 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
1233 let (from, mut end) = iter.next().unwrap();
1234 assert_eq!(from, start);
1235 assert_eq!(end, (from + step).min(*range.end()));
1236
1237 for (next_from, next_end) in iter {
1238 assert_eq!(next_from, end + 1);
1240 end = next_end;
1241 }
1242
1243 assert_eq!(end, *range.end());
1244 }
1245
1246 #[expect(clippy::type_complexity)]
1248 fn build_test_eth_api(
1249 provider: MockEthProvider,
1250 ) -> EthApi<
1251 RpcNodeCoreAdapter<MockEthProvider, TestPool, NoopNetwork, EthEvmConfig>,
1252 RpcConverter<Ethereum, EthEvmConfig, EthReceiptConverter<ChainSpec>>,
1253 > {
1254 EthApiBuilder::new(
1255 provider.clone(),
1256 testing_pool(),
1257 NoopNetwork::default(),
1258 EthEvmConfig::new(provider.chain_spec()),
1259 )
1260 .build()
1261 }
1262
1263 #[tokio::test]
1264 async fn test_range_block_mode_empty_range() {
1265 let provider = MockEthProvider::default();
1266 let eth_api = build_test_eth_api(provider);
1267
1268 let eth_filter = super::EthFilter::new(
1269 eth_api,
1270 EthFilterConfig::default(),
1271 Box::new(TokioTaskExecutor::default()),
1272 );
1273 let filter_inner = eth_filter.inner;
1274
1275 let headers = vec![];
1276 let max_range = 100;
1277
1278 let mut range_mode = RangeBlockMode {
1279 filter_inner,
1280 iter: headers.into_iter().peekable(),
1281 next: VecDeque::new(),
1282 max_range,
1283 pending_tasks: FuturesOrdered::new(),
1284 };
1285
1286 let result = range_mode.next().await;
1287 assert!(result.is_ok());
1288 assert!(result.unwrap().is_none());
1289 }
1290
1291 #[tokio::test]
1292 async fn test_range_block_mode_queued_results_priority() {
1293 let provider = MockEthProvider::default();
1294 let eth_api = build_test_eth_api(provider);
1295
1296 let eth_filter = super::EthFilter::new(
1297 eth_api,
1298 EthFilterConfig::default(),
1299 Box::new(TokioTaskExecutor::default()),
1300 );
1301 let filter_inner = eth_filter.inner;
1302
1303 let headers = vec![
1304 SealedHeader::new(
1305 alloy_consensus::Header { number: 100, ..Default::default() },
1306 FixedBytes::random(),
1307 ),
1308 SealedHeader::new(
1309 alloy_consensus::Header { number: 101, ..Default::default() },
1310 FixedBytes::random(),
1311 ),
1312 ];
1313
1314 let expected_block_hash_1 = FixedBytes::from([1u8; 32]);
1316 let expected_block_hash_2 = FixedBytes::from([2u8; 32]);
1317
1318 let mock_receipt_1 = reth_ethereum_primitives::Receipt {
1320 tx_type: TxType::Legacy,
1321 cumulative_gas_used: 100_000,
1322 logs: vec![],
1323 success: true,
1324 };
1325 let mock_receipt_2 = reth_ethereum_primitives::Receipt {
1326 tx_type: TxType::Eip1559,
1327 cumulative_gas_used: 200_000,
1328 logs: vec![],
1329 success: true,
1330 };
1331 let mock_receipt_3 = reth_ethereum_primitives::Receipt {
1332 tx_type: TxType::Eip2930,
1333 cumulative_gas_used: 150_000,
1334 logs: vec![],
1335 success: false, };
1337
1338 let mock_result_1 = ReceiptBlockResult {
1339 receipts: Arc::new(vec![mock_receipt_1.clone(), mock_receipt_2.clone()]),
1340 recovered_block: None,
1341 header: SealedHeader::new(
1342 alloy_consensus::Header { number: 42, ..Default::default() },
1343 expected_block_hash_1,
1344 ),
1345 };
1346
1347 let mock_result_2 = ReceiptBlockResult {
1348 receipts: Arc::new(vec![mock_receipt_3.clone()]),
1349 recovered_block: None,
1350 header: SealedHeader::new(
1351 alloy_consensus::Header { number: 43, ..Default::default() },
1352 expected_block_hash_2,
1353 ),
1354 };
1355
1356 let mut range_mode = RangeBlockMode {
1357 filter_inner,
1358 iter: headers.into_iter().peekable(),
1359 next: VecDeque::from([mock_result_1, mock_result_2]), max_range: 100,
1361 pending_tasks: FuturesOrdered::new(),
1362 };
1363
1364 let result1 = range_mode.next().await;
1366 assert!(result1.is_ok());
1367 let receipt_result1 = result1.unwrap().unwrap();
1368 assert_eq!(receipt_result1.header.hash(), expected_block_hash_1);
1369 assert_eq!(receipt_result1.header.number, 42);
1370
1371 assert_eq!(receipt_result1.receipts.len(), 2);
1373 assert_eq!(receipt_result1.receipts[0].tx_type, mock_receipt_1.tx_type);
1374 assert_eq!(
1375 receipt_result1.receipts[0].cumulative_gas_used,
1376 mock_receipt_1.cumulative_gas_used
1377 );
1378 assert_eq!(receipt_result1.receipts[0].success, mock_receipt_1.success);
1379 assert_eq!(receipt_result1.receipts[1].tx_type, mock_receipt_2.tx_type);
1380 assert_eq!(
1381 receipt_result1.receipts[1].cumulative_gas_used,
1382 mock_receipt_2.cumulative_gas_used
1383 );
1384 assert_eq!(receipt_result1.receipts[1].success, mock_receipt_2.success);
1385
1386 let result2 = range_mode.next().await;
1388 assert!(result2.is_ok());
1389 let receipt_result2 = result2.unwrap().unwrap();
1390 assert_eq!(receipt_result2.header.hash(), expected_block_hash_2);
1391 assert_eq!(receipt_result2.header.number, 43);
1392
1393 assert_eq!(receipt_result2.receipts.len(), 1);
1395 assert_eq!(receipt_result2.receipts[0].tx_type, mock_receipt_3.tx_type);
1396 assert_eq!(
1397 receipt_result2.receipts[0].cumulative_gas_used,
1398 mock_receipt_3.cumulative_gas_used
1399 );
1400 assert_eq!(receipt_result2.receipts[0].success, mock_receipt_3.success);
1401
1402 assert!(range_mode.next.is_empty());
1404
1405 let result3 = range_mode.next().await;
1406 assert!(result3.is_ok());
1407 }
1408
1409 #[tokio::test]
1410 async fn test_range_block_mode_single_block_no_receipts() {
1411 let provider = MockEthProvider::default();
1412 let eth_api = build_test_eth_api(provider);
1413
1414 let eth_filter = super::EthFilter::new(
1415 eth_api,
1416 EthFilterConfig::default(),
1417 Box::new(TokioTaskExecutor::default()),
1418 );
1419 let filter_inner = eth_filter.inner;
1420
1421 let headers = vec![SealedHeader::new(
1422 alloy_consensus::Header { number: 100, ..Default::default() },
1423 FixedBytes::random(),
1424 )];
1425
1426 let mut range_mode = RangeBlockMode {
1427 filter_inner,
1428 iter: headers.into_iter().peekable(),
1429 next: VecDeque::new(),
1430 max_range: 100,
1431 pending_tasks: FuturesOrdered::new(),
1432 };
1433
1434 let result = range_mode.next().await;
1435 assert!(result.is_ok());
1436 }
1437
1438 #[tokio::test]
1439 async fn test_range_block_mode_provider_receipts() {
1440 let provider = MockEthProvider::default();
1441
1442 let header_1 = alloy_consensus::Header { number: 100, ..Default::default() };
1443 let header_2 = alloy_consensus::Header { number: 101, ..Default::default() };
1444 let header_3 = alloy_consensus::Header { number: 102, ..Default::default() };
1445
1446 let block_hash_1 = FixedBytes::random();
1447 let block_hash_2 = FixedBytes::random();
1448 let block_hash_3 = FixedBytes::random();
1449
1450 provider.add_header(block_hash_1, header_1.clone());
1451 provider.add_header(block_hash_2, header_2.clone());
1452 provider.add_header(block_hash_3, header_3.clone());
1453
1454 let mock_log = alloy_primitives::Log {
1456 address: alloy_primitives::Address::ZERO,
1457 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1458 };
1459
1460 let receipt_100_1 = reth_ethereum_primitives::Receipt {
1461 tx_type: TxType::Legacy,
1462 cumulative_gas_used: 21_000,
1463 logs: vec![mock_log.clone()],
1464 success: true,
1465 };
1466 let receipt_100_2 = reth_ethereum_primitives::Receipt {
1467 tx_type: TxType::Eip1559,
1468 cumulative_gas_used: 42_000,
1469 logs: vec![mock_log.clone()],
1470 success: true,
1471 };
1472 let receipt_101_1 = reth_ethereum_primitives::Receipt {
1473 tx_type: TxType::Eip2930,
1474 cumulative_gas_used: 30_000,
1475 logs: vec![mock_log.clone()],
1476 success: false,
1477 };
1478
1479 provider.add_receipts(100, vec![receipt_100_1.clone(), receipt_100_2.clone()]);
1480 provider.add_receipts(101, vec![receipt_101_1.clone()]);
1481
1482 let eth_api = build_test_eth_api(provider);
1483
1484 let eth_filter = super::EthFilter::new(
1485 eth_api,
1486 EthFilterConfig::default(),
1487 Box::new(TokioTaskExecutor::default()),
1488 );
1489 let filter_inner = eth_filter.inner;
1490
1491 let headers = vec![
1492 SealedHeader::new(header_1, block_hash_1),
1493 SealedHeader::new(header_2, block_hash_2),
1494 SealedHeader::new(header_3, block_hash_3),
1495 ];
1496
1497 let mut range_mode = RangeBlockMode {
1498 filter_inner,
1499 iter: headers.into_iter().peekable(),
1500 next: VecDeque::new(),
1501 max_range: 3, pending_tasks: FuturesOrdered::new(),
1503 };
1504
1505 let result = range_mode.next().await;
1507 assert!(result.is_ok());
1508 let receipt_result = result.unwrap().unwrap();
1509
1510 assert_eq!(receipt_result.header.hash(), block_hash_1);
1511 assert_eq!(receipt_result.header.number, 100);
1512 assert_eq!(receipt_result.receipts.len(), 2);
1513
1514 assert_eq!(receipt_result.receipts[0].tx_type, receipt_100_1.tx_type);
1516 assert_eq!(
1517 receipt_result.receipts[0].cumulative_gas_used,
1518 receipt_100_1.cumulative_gas_used
1519 );
1520 assert_eq!(receipt_result.receipts[0].success, receipt_100_1.success);
1521
1522 assert_eq!(receipt_result.receipts[1].tx_type, receipt_100_2.tx_type);
1523 assert_eq!(
1524 receipt_result.receipts[1].cumulative_gas_used,
1525 receipt_100_2.cumulative_gas_used
1526 );
1527 assert_eq!(receipt_result.receipts[1].success, receipt_100_2.success);
1528
1529 let result2 = range_mode.next().await;
1531 assert!(result2.is_ok());
1532 let receipt_result2 = result2.unwrap().unwrap();
1533
1534 assert_eq!(receipt_result2.header.hash(), block_hash_2);
1535 assert_eq!(receipt_result2.header.number, 101);
1536 assert_eq!(receipt_result2.receipts.len(), 1);
1537
1538 assert_eq!(receipt_result2.receipts[0].tx_type, receipt_101_1.tx_type);
1540 assert_eq!(
1541 receipt_result2.receipts[0].cumulative_gas_used,
1542 receipt_101_1.cumulative_gas_used
1543 );
1544 assert_eq!(receipt_result2.receipts[0].success, receipt_101_1.success);
1545
1546 let result3 = range_mode.next().await;
1548 assert!(result3.is_ok());
1549 assert!(result3.unwrap().is_none());
1550 }
1551
1552 #[tokio::test]
1553 async fn test_range_block_mode_iterator_exhaustion() {
1554 let provider = MockEthProvider::default();
1555
1556 let header_100 = alloy_consensus::Header { number: 100, ..Default::default() };
1557 let header_101 = alloy_consensus::Header { number: 101, ..Default::default() };
1558
1559 let block_hash_100 = FixedBytes::random();
1560 let block_hash_101 = FixedBytes::random();
1561
1562 provider.add_header(block_hash_100, header_100.clone());
1564 provider.add_header(block_hash_101, header_101.clone());
1565
1566 let mock_receipt = reth_ethereum_primitives::Receipt {
1568 tx_type: TxType::Legacy,
1569 cumulative_gas_used: 21_000,
1570 logs: vec![],
1571 success: true,
1572 };
1573 provider.add_receipts(100, vec![mock_receipt.clone()]);
1574 provider.add_receipts(101, vec![mock_receipt.clone()]);
1575
1576 let eth_api = build_test_eth_api(provider);
1577
1578 let eth_filter = super::EthFilter::new(
1579 eth_api,
1580 EthFilterConfig::default(),
1581 Box::new(TokioTaskExecutor::default()),
1582 );
1583 let filter_inner = eth_filter.inner;
1584
1585 let headers = vec![
1586 SealedHeader::new(header_100, block_hash_100),
1587 SealedHeader::new(header_101, block_hash_101),
1588 ];
1589
1590 let mut range_mode = RangeBlockMode {
1591 filter_inner,
1592 iter: headers.into_iter().peekable(),
1593 next: VecDeque::new(),
1594 max_range: 1,
1595 pending_tasks: FuturesOrdered::new(),
1596 };
1597
1598 let result1 = range_mode.next().await;
1599 assert!(result1.is_ok());
1600 assert!(result1.unwrap().is_some()); assert!(range_mode.iter.peek().is_some()); let result2 = range_mode.next().await;
1605 assert!(result2.is_ok());
1606 assert!(result2.unwrap().is_some()); assert!(range_mode.iter.peek().is_none());
1610
1611 let result3 = range_mode.next().await;
1613 assert!(result3.is_ok());
1614 assert!(result3.unwrap().is_none());
1615 }
1616
1617 #[tokio::test]
1618 async fn test_cached_mode_with_mock_receipts() {
1619 let test_hash = FixedBytes::from([42u8; 32]);
1621 let test_block_number = 100u64;
1622 let test_header = SealedHeader::new(
1623 alloy_consensus::Header {
1624 number: test_block_number,
1625 gas_used: 50_000,
1626 ..Default::default()
1627 },
1628 test_hash,
1629 );
1630
1631 let mock_log = alloy_primitives::Log {
1633 address: alloy_primitives::Address::ZERO,
1634 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1635 };
1636
1637 let mock_receipt = reth_ethereum_primitives::Receipt {
1638 tx_type: TxType::Legacy,
1639 cumulative_gas_used: 21_000,
1640 logs: vec![mock_log],
1641 success: true,
1642 };
1643
1644 let provider = MockEthProvider::default();
1645 provider.add_header(test_hash, test_header.header().clone());
1646 provider.add_receipts(test_block_number, vec![mock_receipt.clone()]);
1647
1648 let eth_api = build_test_eth_api(provider);
1649 let eth_filter = super::EthFilter::new(
1650 eth_api,
1651 EthFilterConfig::default(),
1652 Box::new(TokioTaskExecutor::default()),
1653 );
1654 let filter_inner = eth_filter.inner;
1655
1656 let headers = vec![test_header.clone()];
1657
1658 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1659
1660 let result = cached_mode.next().await.expect("next should succeed");
1662 let receipt_block_result = result.expect("should have receipt result");
1663 assert_eq!(receipt_block_result.header.hash(), test_hash);
1664 assert_eq!(receipt_block_result.header.number, test_block_number);
1665 assert_eq!(receipt_block_result.receipts.len(), 1);
1666 assert_eq!(receipt_block_result.receipts[0].tx_type, mock_receipt.tx_type);
1667 assert_eq!(
1668 receipt_block_result.receipts[0].cumulative_gas_used,
1669 mock_receipt.cumulative_gas_used
1670 );
1671 assert_eq!(receipt_block_result.receipts[0].success, mock_receipt.success);
1672
1673 let result2 = cached_mode.next().await;
1675 assert!(result2.is_ok());
1676 assert!(result2.unwrap().is_none());
1677 }
1678
1679 #[tokio::test]
1680 async fn test_cached_mode_empty_headers() {
1681 let provider = MockEthProvider::default();
1682 let eth_api = build_test_eth_api(provider);
1683
1684 let eth_filter = super::EthFilter::new(
1685 eth_api,
1686 EthFilterConfig::default(),
1687 Box::new(TokioTaskExecutor::default()),
1688 );
1689 let filter_inner = eth_filter.inner;
1690
1691 let headers: Vec<SealedHeader<alloy_consensus::Header>> = vec![];
1692
1693 let mut cached_mode = CachedMode { filter_inner, headers_iter: headers.into_iter() };
1694
1695 let result = cached_mode.next().await.expect("next should succeed");
1697 assert!(result.is_none());
1698 }
1699
1700 #[tokio::test]
1701 async fn test_non_consecutive_headers_after_bloom_filter() {
1702 let provider = MockEthProvider::default();
1703
1704 let mut expected_hashes = vec![];
1706 let mut prev_hash = alloy_primitives::B256::default();
1707
1708 use alloy_consensus::TxLegacy;
1710 use reth_ethereum_primitives::{TransactionSigned, TxType};
1711
1712 let tx_inner = TxLegacy {
1713 chain_id: Some(1),
1714 nonce: 0,
1715 gas_price: 21_000,
1716 gas_limit: 21_000,
1717 to: alloy_primitives::TxKind::Call(alloy_primitives::Address::ZERO),
1718 value: alloy_primitives::U256::ZERO,
1719 input: alloy_primitives::Bytes::new(),
1720 };
1721 let signature = alloy_primitives::Signature::test_signature();
1722 let tx = TransactionSigned::new_unhashed(tx_inner.into(), signature);
1723
1724 for i in 100u64..=103 {
1725 let header = alloy_consensus::Header {
1726 number: i,
1727 parent_hash: prev_hash,
1728 logs_bloom: if i == 100 || i == 102 {
1730 alloy_primitives::Bloom::from([1u8; 256])
1731 } else {
1732 alloy_primitives::Bloom::default()
1733 },
1734 ..Default::default()
1735 };
1736
1737 let hash = header.hash_slow();
1738 expected_hashes.push(hash);
1739 prev_hash = hash;
1740
1741 let transactions = if i == 100 || i == 102 { vec![tx.clone()] } else { vec![] };
1743
1744 let block = reth_ethereum_primitives::Block {
1745 header,
1746 body: reth_ethereum_primitives::BlockBody { transactions, ..Default::default() },
1747 };
1748 provider.add_block(hash, block);
1749 }
1750
1751 let mock_log = alloy_primitives::Log {
1753 address: alloy_primitives::Address::ZERO,
1754 data: alloy_primitives::LogData::new_unchecked(vec![], alloy_primitives::Bytes::new()),
1755 };
1756
1757 let receipt = reth_ethereum_primitives::Receipt {
1758 tx_type: TxType::Legacy,
1759 cumulative_gas_used: 21_000,
1760 logs: vec![mock_log],
1761 success: true,
1762 };
1763
1764 provider.add_receipts(100, vec![receipt.clone()]);
1765 provider.add_receipts(101, vec![]);
1766 provider.add_receipts(102, vec![receipt.clone()]);
1767 provider.add_receipts(103, vec![]);
1768
1769 use reth_db_api::models::StoredBlockBodyIndices;
1771 provider
1772 .add_block_body_indices(100, StoredBlockBodyIndices { first_tx_num: 0, tx_count: 1 });
1773 provider
1774 .add_block_body_indices(101, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 0 });
1775 provider
1776 .add_block_body_indices(102, StoredBlockBodyIndices { first_tx_num: 1, tx_count: 1 });
1777 provider
1778 .add_block_body_indices(103, StoredBlockBodyIndices { first_tx_num: 2, tx_count: 0 });
1779
1780 let eth_api = build_test_eth_api(provider);
1781 let eth_filter = EthFilter::new(
1782 eth_api,
1783 EthFilterConfig::default(),
1784 Box::new(TokioTaskExecutor::default()),
1785 );
1786
1787 let filter = Filter::default();
1789
1790 let logs = eth_filter
1792 .inner
1793 .clone()
1794 .get_logs_in_block_range(filter, 100, 103, QueryLimits::default())
1795 .await
1796 .expect("should succeed");
1797
1798 assert_eq!(logs.len(), 2);
1800
1801 assert_eq!(logs[0].block_number, Some(100));
1802 assert_eq!(logs[1].block_number, Some(102));
1803
1804 assert_eq!(logs[0].block_hash, Some(expected_hashes[0])); assert_eq!(logs[1].block_hash, Some(expected_hashes[2])); }
1808}