1use alloy_consensus::BlockHeader;
4use alloy_primitives::TxHash;
5use alloy_rpc_types_eth::{
6 BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
7 PendingTransactionFilterKind,
8};
9use async_trait::async_trait;
10use futures::future::TryFutureExt;
11use jsonrpsee::{core::RpcResult, server::IdProvider};
12use reth_errors::ProviderError;
13use reth_primitives_traits::NodePrimitives;
14use reth_rpc_eth_api::{
15 EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcNodeCore,
16 RpcNodeCoreExt, RpcTransaction, TransactionCompat,
17};
18use reth_rpc_eth_types::{
19 logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
20 EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
21};
22use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
23use reth_storage_api::{
24 BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
25 ProviderReceipt, TransactionsProvider,
26};
27use reth_tasks::TaskSpawner;
28use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
29use std::{
30 collections::HashMap,
31 fmt,
32 future::Future,
33 iter::StepBy,
34 ops::RangeInclusive,
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::{
39 sync::{mpsc::Receiver, oneshot, Mutex},
40 time::MissedTickBehavior,
41};
42use tracing::{error, trace};
43
44impl<Eth> EngineEthFilter for EthFilter<Eth>
45where
46 Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
47{
48 fn logs(
50 &self,
51 filter: Filter,
52 limits: QueryLimits,
53 ) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
54 trace!(target: "rpc::eth", "Serving eth_getLogs");
55 self.logs_for_filter(filter, limits).map_err(|e| e.into())
56 }
57}
58
59const MAX_HEADERS_RANGE: u64 = 1_000; pub struct EthFilter<Eth: EthApiTypes> {
66 inner: Arc<EthFilterInner<Eth>>,
68}
69
70impl<Eth> Clone for EthFilter<Eth>
71where
72 Eth: EthApiTypes,
73{
74 fn clone(&self) -> Self {
75 Self { inner: self.inner.clone() }
76 }
77}
78
79impl<Eth> EthFilter<Eth>
80where
81 Eth: EthApiTypes + 'static,
82{
83 pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
111 let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
112 config;
113 let inner = EthFilterInner {
114 eth_api,
115 active_filters: ActiveFilters::new(),
116 id_provider: Arc::new(EthSubscriptionIdProvider::default()),
117 max_headers_range: MAX_HEADERS_RANGE,
118 task_spawner,
119 stale_filter_ttl,
120 query_limits: QueryLimits { max_blocks_per_filter, max_logs_per_response },
121 };
122
123 let eth_filter = Self { inner: Arc::new(inner) };
124
125 let this = eth_filter.clone();
126 eth_filter.inner.task_spawner.spawn_critical(
127 "eth-filters_stale-filters-clean",
128 Box::pin(async move {
129 this.watch_and_clear_stale_filters().await;
130 }),
131 );
132
133 eth_filter
134 }
135
136 pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
138 &self.inner.active_filters
139 }
140
141 async fn watch_and_clear_stale_filters(&self) {
144 let mut interval = tokio::time::interval_at(
145 tokio::time::Instant::now() + self.inner.stale_filter_ttl,
146 self.inner.stale_filter_ttl,
147 );
148 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
149 loop {
150 interval.tick().await;
151 self.clear_stale_filters(Instant::now()).await;
152 }
153 }
154
155 pub async fn clear_stale_filters(&self, now: Instant) {
158 trace!(target: "rpc::eth", "clear stale filters");
159 self.active_filters().inner.lock().await.retain(|id, filter| {
160 let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;
161
162 if !is_valid {
163 trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
164 }
165
166 is_valid
167 })
168 }
169}
170
171impl<Eth> EthFilter<Eth>
172where
173 Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
174{
175 fn provider(&self) -> &Eth::Provider {
177 self.inner.eth_api.provider()
178 }
179
180 fn pool(&self) -> &Eth::Pool {
182 self.inner.eth_api.pool()
183 }
184
185 pub async fn filter_changes(
187 &self,
188 id: FilterId,
189 ) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
190 let info = self.provider().chain_info()?;
191 let best_number = info.best_number;
192
193 let (start_block, kind) = {
196 let mut filters = self.inner.active_filters.inner.lock().await;
197 let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;
198
199 if filter.block > best_number {
200 return Ok(FilterChanges::Empty)
202 }
203
204 let mut block = best_number + 1;
208 std::mem::swap(&mut filter.block, &mut block);
209 filter.last_poll_timestamp = Instant::now();
210
211 (block, filter.kind.clone())
212 };
213
214 match kind {
215 FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
216 FilterKind::Block => {
217 let end_block = best_number + 1;
220 let block_hashes =
221 self.provider().canonical_hashes_range(start_block, end_block).map_err(
222 |_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
223 )?;
224 Ok(FilterChanges::Hashes(block_hashes))
225 }
226 FilterKind::Log(filter) => {
227 let (from_block_number, to_block_number) = match filter.block_option {
228 FilterBlockOption::Range { from_block, to_block } => {
229 let from = from_block
230 .map(|num| self.provider().convert_block_number(num))
231 .transpose()?
232 .flatten();
233 let to = to_block
234 .map(|num| self.provider().convert_block_number(num))
235 .transpose()?
236 .flatten();
237 logs_utils::get_filter_block_range(from, to, start_block, info)
238 }
239 FilterBlockOption::AtBlockHash(_) => {
240 (start_block, best_number)
244 }
245 };
246 let logs = self
247 .inner
248 .clone()
249 .get_logs_in_block_range(
250 *filter,
251 from_block_number,
252 to_block_number,
253 self.inner.query_limits,
254 )
255 .await?;
256 Ok(FilterChanges::Logs(logs))
257 }
258 }
259 }
260
261 pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
267 let filter = {
268 let filters = self.inner.active_filters.inner.lock().await;
269 if let FilterKind::Log(ref filter) =
270 filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
271 {
272 *filter.clone()
273 } else {
274 return Err(EthFilterError::FilterNotFound(id))
276 }
277 };
278
279 self.logs_for_filter(filter, self.inner.query_limits).await
280 }
281
282 async fn logs_for_filter(
284 &self,
285 filter: Filter,
286 limits: QueryLimits,
287 ) -> Result<Vec<Log>, EthFilterError> {
288 self.inner.clone().logs_for_filter(filter, limits).await
289 }
290}
291
292#[async_trait]
293impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
294where
295 Eth: FullEthApiTypes
296 + RpcNodeCoreExt<
297 Provider: BlockIdReader,
298 Primitives: NodePrimitives<
299 SignedTx = <<Eth as RpcNodeCore>::Provider as TransactionsProvider>::Transaction,
300 >,
301 > + 'static,
302{
303 async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
305 trace!(target: "rpc::eth", "Serving eth_newFilter");
306 self.inner
307 .install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Log(Box::new(filter)))
308 .await
309 }
310
311 async fn new_block_filter(&self) -> RpcResult<FilterId> {
313 trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
314 self.inner.install_filter(FilterKind::<RpcTransaction<Eth::NetworkTypes>>::Block).await
315 }
316
317 async fn new_pending_transaction_filter(
319 &self,
320 kind: Option<PendingTransactionFilterKind>,
321 ) -> RpcResult<FilterId> {
322 trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
323
324 let transaction_kind = match kind.unwrap_or_default() {
325 PendingTransactionFilterKind::Hashes => {
326 let receiver = self.pool().pending_transactions_listener();
327 let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
328 FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
329 }
330 PendingTransactionFilterKind::Full => {
331 let stream = self.pool().new_pending_pool_transactions_listener();
332 let full_txs_receiver = FullTransactionsReceiver::new(
333 stream,
334 self.inner.eth_api.tx_resp_builder().clone(),
335 );
336 FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
337 full_txs_receiver,
338 )))
339 }
340 };
341
342 self.inner.install_filter(transaction_kind).await
346 }
347
348 async fn filter_changes(
350 &self,
351 id: FilterId,
352 ) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
353 trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
354 Ok(Self::filter_changes(self, id).await?)
355 }
356
357 async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
363 trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
364 Ok(Self::filter_logs(self, id).await?)
365 }
366
367 async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
369 trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
370 let mut filters = self.inner.active_filters.inner.lock().await;
371 if filters.remove(&id).is_some() {
372 trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
373 Ok(true)
374 } else {
375 Ok(false)
376 }
377 }
378
379 async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
383 trace!(target: "rpc::eth", "Serving eth_getLogs");
384 Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
385 }
386}
387
388impl<Eth> std::fmt::Debug for EthFilter<Eth>
389where
390 Eth: EthApiTypes,
391{
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 f.debug_struct("EthFilter").finish_non_exhaustive()
394 }
395}
396
397#[derive(Debug)]
399struct EthFilterInner<Eth: EthApiTypes> {
400 eth_api: Eth,
402 active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
404 id_provider: Arc<dyn IdProvider>,
406 query_limits: QueryLimits,
408 max_headers_range: u64,
410 task_spawner: Box<dyn TaskSpawner>,
412 stale_filter_ttl: Duration,
414}
415
416impl<Eth> EthFilterInner<Eth>
417where
418 Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
419 + EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
420 + 'static,
421{
422 fn provider(&self) -> &Eth::Provider {
424 self.eth_api.provider()
425 }
426
427 fn eth_cache(
429 &self,
430 ) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
431 self.eth_api.cache()
432 }
433
434 async fn logs_for_filter(
436 self: Arc<Self>,
437 filter: Filter,
438 limits: QueryLimits,
439 ) -> Result<Vec<Log>, EthFilterError> {
440 match filter.block_option {
441 FilterBlockOption::AtBlockHash(block_hash) => {
442 let header = self
445 .provider()
446 .header_by_hash_or_number(block_hash.into())?
447 .ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
448
449 let block_num_hash = BlockNumHash::new(header.number(), block_hash);
450
451 let (receipts, maybe_block) = self
454 .eth_cache()
455 .get_receipts_and_maybe_block(block_num_hash.hash)
456 .await?
457 .ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
458
459 let mut all_logs = Vec::new();
460 append_matching_block_logs(
461 &mut all_logs,
462 maybe_block
463 .map(ProviderOrBlock::Block)
464 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
465 &filter,
466 block_num_hash,
467 &receipts,
468 false,
469 header.timestamp(),
470 )?;
471
472 Ok(all_logs)
473 }
474 FilterBlockOption::Range { from_block, to_block } => {
475 let info = self.provider().chain_info()?;
477
478 let start_block = info.best_number;
480 let from = from_block
481 .map(|num| self.provider().convert_block_number(num))
482 .transpose()?
483 .flatten();
484 let to = to_block
485 .map(|num| self.provider().convert_block_number(num))
486 .transpose()?
487 .flatten();
488 let (from_block_number, to_block_number) =
489 logs_utils::get_filter_block_range(from, to, start_block, info);
490 self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
491 .await
492 }
493 }
494 }
495
496 async fn install_filter(
498 &self,
499 kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
500 ) -> RpcResult<FilterId> {
501 let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
502 let subscription_id = self.id_provider.next_id();
503
504 let id = match subscription_id {
505 jsonrpsee_types::SubscriptionId::Num(n) => FilterId::Num(n),
506 jsonrpsee_types::SubscriptionId::Str(s) => FilterId::Str(s.into_owned()),
507 };
508 let mut filters = self.active_filters.inner.lock().await;
509 filters.insert(
510 id.clone(),
511 ActiveFilter {
512 block: last_poll_block_number,
513 last_poll_timestamp: Instant::now(),
514 kind,
515 },
516 );
517 Ok(id)
518 }
519
520 async fn get_logs_in_block_range(
526 self: Arc<Self>,
527 filter: Filter,
528 from_block: u64,
529 to_block: u64,
530 limits: QueryLimits,
531 ) -> Result<Vec<Log>, EthFilterError> {
532 trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
533
534 if to_block < from_block {
536 return Err(EthFilterError::InvalidBlockRangeParams)
537 }
538
539 if let Some(max_blocks_per_filter) =
540 limits.max_blocks_per_filter.filter(|limit| to_block - from_block > *limit)
541 {
542 return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
543 }
544
545 let (tx, rx) = oneshot::channel();
546 let this = self.clone();
547 self.task_spawner.spawn_blocking(Box::pin(async move {
548 let res =
549 this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
550 let _ = tx.send(res);
551 }));
552
553 rx.await.map_err(|_| EthFilterError::InternalError)?
554 }
555
556 async fn get_logs_in_block_range_inner(
565 &self,
566 filter: &Filter,
567 from_block: u64,
568 to_block: u64,
569 limits: QueryLimits,
570 ) -> Result<Vec<Log>, EthFilterError> {
571 let mut all_logs = Vec::new();
572
573 for (from, to) in
576 BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
577 {
578 let headers = self.provider().headers_range(from..=to)?;
579 for (idx, header) in headers
580 .iter()
581 .enumerate()
582 .filter(|(_, header)| filter.matches_bloom(header.logs_bloom()))
583 {
584 let block_hash = match headers.get(idx + 1) {
587 Some(child) => child.parent_hash(),
588 None => self
589 .provider()
590 .block_hash(header.number())?
591 .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
592 };
593
594 let num_hash = BlockNumHash::new(header.number(), block_hash);
595 if let Some((receipts, maybe_block)) =
596 self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await?
597 {
598 append_matching_block_logs(
599 &mut all_logs,
600 maybe_block
601 .map(ProviderOrBlock::Block)
602 .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
603 filter,
604 num_hash,
605 &receipts,
606 false,
607 header.timestamp(),
608 )?;
609
610 let is_multi_block_range = from_block != to_block;
613 if let Some(max_logs_per_response) = limits.max_logs_per_response {
614 if is_multi_block_range && all_logs.len() > max_logs_per_response {
615 return Err(EthFilterError::QueryExceedsMaxResults {
616 max_logs: max_logs_per_response,
617 from_block,
618 to_block: num_hash.number.saturating_sub(1),
619 });
620 }
621 }
622 }
623 }
624 }
625
626 Ok(all_logs)
627 }
628}
629
630#[derive(Debug, Clone, Default)]
632pub struct ActiveFilters<T> {
633 inner: Arc<Mutex<HashMap<FilterId, ActiveFilter<T>>>>,
634}
635
636impl<T> ActiveFilters<T> {
637 pub fn new() -> Self {
639 Self { inner: Arc::new(Mutex::new(HashMap::default())) }
640 }
641}
642
643#[derive(Debug)]
645struct ActiveFilter<T> {
646 block: u64,
648 last_poll_timestamp: Instant,
650 kind: FilterKind<T>,
652}
653
654#[derive(Debug, Clone)]
656struct PendingTransactionsReceiver {
657 txs_receiver: Arc<Mutex<Receiver<TxHash>>>,
658}
659
660impl PendingTransactionsReceiver {
661 fn new(receiver: Receiver<TxHash>) -> Self {
662 Self { txs_receiver: Arc::new(Mutex::new(receiver)) }
663 }
664
665 async fn drain<T>(&self) -> FilterChanges<T> {
667 let mut pending_txs = Vec::new();
668 let mut prepared_stream = self.txs_receiver.lock().await;
669
670 while let Ok(tx_hash) = prepared_stream.try_recv() {
671 pending_txs.push(tx_hash);
672 }
673
674 FilterChanges::Hashes(pending_txs)
676 }
677}
678
679#[derive(Debug, Clone)]
681struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
682 txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
683 tx_resp_builder: TxCompat,
684}
685
686impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
687where
688 T: PoolTransaction + 'static,
689 TxCompat: TransactionCompat<Primitives: NodePrimitives<SignedTx = T::Consensus>>,
690{
691 fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
693 Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
694 }
695
696 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
698 let mut pending_txs = Vec::new();
699 let mut prepared_stream = self.txs_stream.lock().await;
700
701 while let Ok(tx) = prepared_stream.try_recv() {
702 match self.tx_resp_builder.fill_pending(tx.transaction.to_consensus()) {
703 Ok(tx) => pending_txs.push(tx),
704 Err(err) => {
705 error!(target: "rpc",
706 %err,
707 "Failed to fill txn with block context"
708 );
709 }
710 }
711 }
712 FilterChanges::Transactions(pending_txs)
713 }
714}
715
716#[async_trait]
718trait FullTransactionsFilter<T>: fmt::Debug + Send + Sync + Unpin + 'static {
719 async fn drain(&self) -> FilterChanges<T>;
720}
721
722#[async_trait]
723impl<T, TxCompat> FullTransactionsFilter<RpcTransaction<TxCompat::Network>>
724 for FullTransactionsReceiver<T, TxCompat>
725where
726 T: PoolTransaction + 'static,
727 TxCompat: TransactionCompat<Primitives: NodePrimitives<SignedTx = T::Consensus>> + 'static,
728{
729 async fn drain(&self) -> FilterChanges<RpcTransaction<TxCompat::Network>> {
730 Self::drain(self).await
731 }
732}
733
734#[derive(Debug, Clone)]
740enum PendingTransactionKind<T> {
741 Hashes(PendingTransactionsReceiver),
742 FullTransaction(Arc<dyn FullTransactionsFilter<T>>),
743}
744
745impl<T: 'static> PendingTransactionKind<T> {
746 async fn drain(&self) -> FilterChanges<T> {
747 match self {
748 Self::Hashes(receiver) => receiver.drain().await,
749 Self::FullTransaction(receiver) => receiver.drain().await,
750 }
751 }
752}
753
754#[derive(Clone, Debug)]
755enum FilterKind<T> {
756 Log(Box<Filter>),
757 Block,
758 PendingTransaction(PendingTransactionKind<T>),
759}
760
761#[derive(Debug)]
763struct BlockRangeInclusiveIter {
764 iter: StepBy<RangeInclusive<u64>>,
765 step: u64,
766 end: u64,
767}
768
769impl BlockRangeInclusiveIter {
770 fn new(range: RangeInclusive<u64>, step: u64) -> Self {
771 Self { end: *range.end(), iter: range.step_by(step as usize + 1), step }
772 }
773}
774
775impl Iterator for BlockRangeInclusiveIter {
776 type Item = (u64, u64);
777
778 fn next(&mut self) -> Option<Self::Item> {
779 let start = self.iter.next()?;
780 let end = (start + self.step).min(self.end);
781 if start > end {
782 return None
783 }
784 Some((start, end))
785 }
786}
787
788#[derive(Debug, thiserror::Error)]
790pub enum EthFilterError {
791 #[error("filter not found")]
793 FilterNotFound(FilterId),
794 #[error("invalid block range params")]
796 InvalidBlockRangeParams,
797 #[error("query exceeds max block range {0}")]
799 QueryExceedsMaxBlocks(u64),
800 #[error("query exceeds max results {max_logs}, retry with the range {from_block}-{to_block}")]
802 QueryExceedsMaxResults {
803 max_logs: usize,
805 from_block: u64,
807 to_block: u64,
809 },
810 #[error(transparent)]
812 EthAPIError(#[from] EthApiError),
813 #[error("internal filter error")]
815 InternalError,
816}
817
818impl From<EthFilterError> for jsonrpsee::types::error::ErrorObject<'static> {
819 fn from(err: EthFilterError) -> Self {
820 match err {
821 EthFilterError::FilterNotFound(_) => rpc_error_with_code(
822 jsonrpsee::types::error::INVALID_PARAMS_CODE,
823 "filter not found",
824 ),
825 err @ EthFilterError::InternalError => {
826 rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string())
827 }
828 EthFilterError::EthAPIError(err) => err.into(),
829 err @ (EthFilterError::InvalidBlockRangeParams |
830 EthFilterError::QueryExceedsMaxBlocks(_) |
831 EthFilterError::QueryExceedsMaxResults { .. }) => {
832 rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
833 }
834 }
835 }
836}
837
838impl From<ProviderError> for EthFilterError {
839 fn from(err: ProviderError) -> Self {
840 Self::EthAPIError(err.into())
841 }
842}
843
844#[cfg(test)]
845mod tests {
846 use super::*;
847 use rand::Rng;
848 use reth_testing_utils::generators;
849
850 #[test]
851 fn test_block_range_iter() {
852 let mut rng = generators::rng();
853
854 let start = rng.random::<u32>() as u64;
855 let end = start.saturating_add(rng.random::<u32>() as u64);
856 let step = rng.random::<u16>() as u64;
857 let range = start..=end;
858 let mut iter = BlockRangeInclusiveIter::new(range.clone(), step);
859 let (from, mut end) = iter.next().unwrap();
860 assert_eq!(from, start);
861 assert_eq!(end, (from + step).min(*range.end()));
862
863 for (next_from, next_end) in iter {
864 assert_eq!(next_from, end + 1);
866 end = next_end;
867 }
868
869 assert_eq!(end, *range.end());
870 }
871}