reth_rpc_api_testing_util/
trace.rs

1//! Helpers for testing trace calls.
2
3use alloy_eips::BlockId;
4use alloy_primitives::{map::HashSet, Bytes, TxHash, B256};
5use alloy_rpc_types_eth::{transaction::TransactionRequest, Index};
6use alloy_rpc_types_trace::{
7    filter::TraceFilter,
8    opcode::BlockOpcodeGas,
9    parity::{LocalizedTransactionTrace, TraceResults, TraceType},
10    tracerequest::TraceCallRequest,
11};
12use futures::{Stream, StreamExt};
13use jsonrpsee::core::client::Error as RpcError;
14use reth_rpc_api::clients::TraceApiClient;
15use std::{
16    pin::Pin,
17    task::{Context, Poll},
18};
19
20/// A type alias that represents the result of a raw transaction trace stream.
21type RawTransactionTraceResult<'a> =
22    Pin<Box<dyn Stream<Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>> + 'a>>;
23
24/// A result type for the `trace_block` method that also captures the requested block.
25pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
26
27/// A result type for the `trace_blockOpcodeGas` method that also captures the requested block.
28pub type TraceBlockOpCodeGasResult = Result<(BlockOpcodeGas, BlockId), (RpcError, BlockId)>;
29
30/// Type alias representing the result of replaying a transaction.
31pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
32
33/// A type representing the result of calling `trace_call_many` method.
34pub type CallManyTraceResult = Result<
35    (Vec<TraceResults>, Vec<(TransactionRequest, HashSet<TraceType>)>),
36    (RpcError, Vec<(TransactionRequest, HashSet<TraceType>)>),
37>;
38
39/// Result type for the `trace_get` method that also captures the requested transaction hash and
40/// index.
41pub type TraceGetResult =
42    Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
43
44/// Represents a result type for the `trace_filter` stream extension.
45pub type TraceFilterResult =
46    Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;
47
48/// Represents the result of a single trace call.
49pub type TraceCallResult = Result<TraceResults, (RpcError, TraceCallRequest)>;
50
51/// An extension trait for the Trace API.
52pub trait TraceApiExt {
53    /// The provider type that is used to make the requests.
54    type Provider;
55
56    /// Returns a new stream that yields the traces for the given blocks.
57    ///
58    /// See also [`StreamExt::buffered`].
59    fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
60    where
61        I: IntoIterator<Item = B>,
62        B: Into<BlockId>;
63
64    /// Returns a new stream that yields the traces for the given blocks.
65    ///
66    /// See also [`StreamExt::buffer_unordered`].
67    fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
68    where
69        I: IntoIterator<Item = B>,
70        B: Into<BlockId>;
71
72    /// Returns a new stream that yields the traces the opcodes for the given blocks.
73    ///
74    /// See also [`StreamExt::buffered`].
75    fn trace_block_opcode_gas_unordered<I, B>(
76        &self,
77        params: I,
78        n: usize,
79    ) -> TraceBlockOpcodeGasStream<'_>
80    where
81        I: IntoIterator<Item = B>,
82        B: Into<BlockId>;
83
84    /// Returns a new stream that replays the transactions for the given transaction hashes.
85    ///
86    /// This returns all results in order.
87    fn replay_transactions<I>(
88        &self,
89        tx_hashes: I,
90        trace_types: HashSet<TraceType>,
91    ) -> ReplayTransactionStream<'_>
92    where
93        I: IntoIterator<Item = TxHash>;
94
95    /// Returns a new stream that traces the provided raw transaction data.
96    fn trace_raw_transaction_stream(
97        &self,
98        data: Bytes,
99        trace_types: HashSet<TraceType>,
100        block_id: Option<BlockId>,
101    ) -> RawTransactionTraceStream<'_>;
102
103    /// Creates a stream of results for multiple dependent transaction calls on top of the same
104    /// block.
105    fn trace_call_many_stream<I>(
106        &self,
107        calls: I,
108        block_id: Option<BlockId>,
109    ) -> CallManyTraceStream<'_>
110    where
111        I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>;
112
113    /// Returns a new stream that yields the traces for the given transaction hash and indices.
114    fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
115    where
116        I: IntoIterator<Item = Index>;
117
118    /// Returns a new stream that yields traces for given filters.
119    fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
120    where
121        I: IntoIterator<Item = TraceFilter>;
122
123    /// Returns a new stream that yields the trace results for the given call requests.
124    fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_>;
125}
126/// `TraceCallStream` provides an asynchronous stream of tracing results.
127#[must_use = "streams do nothing unless polled"]
128pub struct TraceCallStream<'a> {
129    stream: Pin<Box<dyn Stream<Item = TraceCallResult> + 'a>>,
130}
131
132impl Stream for TraceCallStream<'_> {
133    type Item = TraceCallResult;
134
135    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        self.stream.as_mut().poll_next(cx)
137    }
138}
139
140impl std::fmt::Debug for TraceCallStream<'_> {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("TraceCallStream").finish()
143    }
144}
145
146/// Represents a stream that asynchronously yields the results of the `trace_filter` method.
147#[must_use = "streams do nothing unless polled"]
148pub struct TraceFilterStream<'a> {
149    stream: Pin<Box<dyn Stream<Item = TraceFilterResult> + 'a>>,
150}
151
152impl Stream for TraceFilterStream<'_> {
153    type Item = TraceFilterResult;
154
155    /// Attempts to pull out the next value of the stream.
156    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157        self.stream.as_mut().poll_next(cx)
158    }
159}
160
161impl std::fmt::Debug for TraceFilterStream<'_> {
162    /// Provides a debug representation of the `TraceFilterStream`.
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("TraceFilterStream").finish_non_exhaustive()
165    }
166}
167
168/// A stream that asynchronously yields the results of the `trace_get` method for a given
169/// transaction hash and a series of indices.
170#[must_use = "streams do nothing unless polled"]
171pub struct TraceGetStream<'a> {
172    stream: Pin<Box<dyn Stream<Item = TraceGetResult> + 'a>>,
173}
174
175impl Stream for TraceGetStream<'_> {
176    type Item = TraceGetResult;
177
178    /// Attempts to pull out the next item of the stream
179    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180        self.stream.as_mut().poll_next(cx)
181    }
182}
183
184impl std::fmt::Debug for TraceGetStream<'_> {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("TraceGetStream").finish_non_exhaustive()
187    }
188}
189
190/// A stream that provides asynchronous iteration over results from the `trace_call_many` function.
191///
192/// The stream yields items of type `CallManyTraceResult`.
193#[must_use = "streams do nothing unless polled"]
194pub struct CallManyTraceStream<'a> {
195    stream: Pin<Box<dyn Stream<Item = CallManyTraceResult> + 'a>>,
196}
197
198impl Stream for CallManyTraceStream<'_> {
199    type Item = CallManyTraceResult;
200
201    /// Polls for the next item from the stream.
202    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203        self.stream.as_mut().poll_next(cx)
204    }
205}
206
207impl std::fmt::Debug for CallManyTraceStream<'_> {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("CallManyTraceStream").finish()
210    }
211}
212
213/// A stream that traces the provided raw transaction data.
214#[must_use = "streams do nothing unless polled"]
215pub struct RawTransactionTraceStream<'a> {
216    stream: RawTransactionTraceResult<'a>,
217}
218
219impl Stream for RawTransactionTraceStream<'_> {
220    type Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>;
221
222    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223        self.stream.as_mut().poll_next(cx)
224    }
225}
226
227impl std::fmt::Debug for RawTransactionTraceStream<'_> {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        f.debug_struct("RawTransactionTraceStream").finish()
230    }
231}
232
233/// A stream that replays the transactions for the requested hashes.
234#[must_use = "streams do nothing unless polled"]
235pub struct ReplayTransactionStream<'a> {
236    stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
237}
238
239impl Stream for ReplayTransactionStream<'_> {
240    type Item = ReplayTransactionResult;
241
242    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243        self.stream.as_mut().poll_next(cx)
244    }
245}
246
247impl std::fmt::Debug for ReplayTransactionStream<'_> {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        f.debug_struct("ReplayTransactionStream").finish()
250    }
251}
252
253impl<T: TraceApiClient<TransactionRequest> + Sync> TraceApiExt for T {
254    type Provider = T;
255
256    fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
257    where
258        I: IntoIterator<Item = B>,
259        B: Into<BlockId>,
260    {
261        let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
262        let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
263            match self.trace_block(block).await {
264                Ok(result) => Ok((result.unwrap_or_default(), block)),
265                Err(err) => Err((err, block)),
266            }
267        }))
268        .buffered(n);
269        TraceBlockStream { stream: Box::pin(stream) }
270    }
271
272    fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
273    where
274        I: IntoIterator<Item = B>,
275        B: Into<BlockId>,
276    {
277        let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
278        let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
279            match self.trace_block(block).await {
280                Ok(result) => Ok((result.unwrap_or_default(), block)),
281                Err(err) => Err((err, block)),
282            }
283        }))
284        .buffer_unordered(n);
285        TraceBlockStream { stream: Box::pin(stream) }
286    }
287
288    fn trace_block_opcode_gas_unordered<I, B>(
289        &self,
290        params: I,
291        n: usize,
292    ) -> TraceBlockOpcodeGasStream<'_>
293    where
294        I: IntoIterator<Item = B>,
295        B: Into<BlockId>,
296    {
297        let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
298        let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
299            match self.trace_block_opcode_gas(block).await {
300                Ok(result) => Ok((result.unwrap(), block)),
301                Err(err) => Err((err, block)),
302            }
303        }))
304        .buffered(n);
305        TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
306    }
307
308    fn replay_transactions<I>(
309        &self,
310        tx_hashes: I,
311        trace_types: HashSet<TraceType>,
312    ) -> ReplayTransactionStream<'_>
313    where
314        I: IntoIterator<Item = TxHash>,
315    {
316        let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
317        let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
318            let trace_types_clone = trace_types.clone(); // Clone outside of the async block
319            async move {
320                match self.replay_transaction(hash, trace_types_clone).await {
321                    Ok(result) => Ok((result, hash)),
322                    Err(err) => Err((err, hash)),
323                }
324            }
325        }))
326        .buffered(10);
327        ReplayTransactionStream { stream: Box::pin(stream) }
328    }
329
330    fn trace_raw_transaction_stream(
331        &self,
332        data: Bytes,
333        trace_types: HashSet<TraceType>,
334        block_id: Option<BlockId>,
335    ) -> RawTransactionTraceStream<'_> {
336        let stream = futures::stream::once(async move {
337            match self.trace_raw_transaction(data.clone(), trace_types, block_id).await {
338                Ok(result) => Ok((result, data)),
339                Err(err) => Err((err, data)),
340            }
341        });
342        RawTransactionTraceStream { stream: Box::pin(stream) }
343    }
344
345    fn trace_call_many_stream<I>(
346        &self,
347        calls: I,
348        block_id: Option<BlockId>,
349    ) -> CallManyTraceStream<'_>
350    where
351        I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>,
352    {
353        let call_set = calls.into_iter().collect::<Vec<_>>();
354        let stream = futures::stream::once(async move {
355            match self.trace_call_many(call_set.clone(), block_id).await {
356                Ok(results) => Ok((results, call_set)),
357                Err(err) => Err((err, call_set)),
358            }
359        });
360        CallManyTraceStream { stream: Box::pin(stream) }
361    }
362
363    fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
364    where
365        I: IntoIterator<Item = Index>,
366    {
367        let index_list = indices.into_iter().collect::<Vec<_>>();
368        let stream = futures::stream::iter(index_list.into_iter().map(move |index| async move {
369            match self.trace_get(hash, vec![index]).await {
370                Ok(result) => Ok((result, hash, vec![index])),
371                Err(err) => Err((err, hash, vec![index])),
372            }
373        }))
374        .buffered(10);
375        TraceGetStream { stream: Box::pin(stream) }
376    }
377
378    fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
379    where
380        I: IntoIterator<Item = TraceFilter>,
381    {
382        let filter_list = filters.into_iter().collect::<Vec<_>>();
383        let stream = futures::stream::iter(filter_list.into_iter().map(move |filter| async move {
384            match self.trace_filter(filter.clone()).await {
385                Ok(result) => Ok((result, filter)),
386                Err(err) => Err((err, filter)),
387            }
388        }))
389        .buffered(10);
390        TraceFilterStream { stream: Box::pin(stream) }
391    }
392
393    fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_> {
394        let stream = futures::stream::once(async move {
395            match self
396                .trace_call(
397                    request.call.clone(),
398                    request.trace_types.clone(),
399                    request.block_id,
400                    request.state_overrides.clone(),
401                    request.block_overrides.clone(),
402                )
403                .await
404            {
405                Ok(result) => Ok(result),
406                Err(err) => Err((err, request)),
407            }
408        });
409        TraceCallStream { stream: Box::pin(stream) }
410    }
411}
412
413/// A stream that yields the traces for the requested blocks.
414#[must_use = "streams do nothing unless polled"]
415pub struct TraceBlockStream<'a> {
416    stream: Pin<Box<dyn Stream<Item = TraceBlockResult> + 'a>>,
417}
418
419impl TraceBlockStream<'_> {
420    /// Returns the next error result of the stream.
421    pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
422        loop {
423            match self.next().await? {
424                Ok(_) => {}
425                Err(err) => return Some(err),
426            }
427        }
428    }
429}
430
431impl Stream for TraceBlockStream<'_> {
432    type Item = TraceBlockResult;
433
434    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
435        self.stream.as_mut().poll_next(cx)
436    }
437}
438
439impl std::fmt::Debug for TraceBlockStream<'_> {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        f.debug_struct("TraceBlockStream").finish_non_exhaustive()
442    }
443}
444
445/// A stream that yields the opcodes for the requested blocks.
446#[must_use = "streams do nothing unless polled"]
447pub struct TraceBlockOpcodeGasStream<'a> {
448    stream: Pin<Box<dyn Stream<Item = TraceBlockOpCodeGasResult> + 'a>>,
449}
450
451impl TraceBlockOpcodeGasStream<'_> {
452    /// Returns the next error result of the stream.
453    pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
454        loop {
455            match self.next().await? {
456                Ok(_) => {}
457                Err(err) => return Some(err),
458            }
459        }
460    }
461}
462
463impl Stream for TraceBlockOpcodeGasStream<'_> {
464    type Item = TraceBlockOpCodeGasResult;
465
466    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
467        self.stream.as_mut().poll_next(cx)
468    }
469}
470
471impl std::fmt::Debug for TraceBlockOpcodeGasStream<'_> {
472    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473        f.debug_struct("TraceBlockOpcodeGasStream").finish_non_exhaustive()
474    }
475}
476
477/// A utility to compare RPC responses from two different clients.
478///
479/// The `RpcComparer` is designed to perform comparisons between two RPC clients.
480/// It is useful in scenarios where there's a need to ensure that two different RPC clients
481/// return consistent responses. This can be particularly valuable in testing environments
482/// where one might want to compare a test client's responses against a production client
483/// or compare two different Ethereum client implementations.
484#[derive(Debug)]
485pub struct RpcComparer<C1, C2>
486where
487    C1: TraceApiExt,
488    C2: TraceApiExt,
489{
490    client1: C1,
491    client2: C2,
492}
493impl<C1, C2> RpcComparer<C1, C2>
494where
495    C1: TraceApiExt,
496    C2: TraceApiExt,
497{
498    /// Constructs a new `RpcComparer`.
499    ///
500    /// Initializes the comparer with two clients that will be used for fetching
501    /// and comparison.
502    ///
503    /// # Arguments
504    ///
505    /// * `client1` - The first RPC client.
506    /// * `client2` - The second RPC client.
507    pub const fn new(client1: C1, client2: C2) -> Self {
508        Self { client1, client2 }
509    }
510
511    /// Compares the `trace_block` responses from the two RPC clients.
512    ///
513    /// Fetches the `trace_block` responses for the provided block IDs from both clients
514    /// and compares them. If there are inconsistencies between the two responses, this
515    /// method will panic with a relevant message indicating the difference.
516    pub async fn compare_trace_block_responses(&self, block_ids: Vec<BlockId>) {
517        let stream1 = self.client1.trace_block_buffered(block_ids.clone(), 2);
518        let stream2 = self.client2.trace_block_buffered(block_ids, 2);
519
520        let mut zipped_streams = stream1.zip(stream2);
521
522        while let Some((result1, result2)) = zipped_streams.next().await {
523            match (result1, result2) {
524                (Ok((ref traces1_data, ref block1)), Ok((ref traces2_data, ref block2))) => {
525                    similar_asserts::assert_eq!(
526                        traces1_data,
527                        traces2_data,
528                        "Mismatch in traces for block: {:?}",
529                        block1
530                    );
531                    assert_eq!(block1, block2, "Mismatch in block ids.");
532                }
533                (Err((ref err1, ref block1)), Err((ref err2, ref block2))) => {
534                    assert_eq!(
535                        format!("{err1:?}"),
536                        format!("{err2:?}"),
537                        "Different errors for block: {block1:?}"
538                    );
539                    assert_eq!(block1, block2, "Mismatch in block ids.");
540                }
541                _ => panic!("One client returned Ok while the other returned Err."),
542            }
543        }
544    }
545
546    /// Compares the `replay_transactions` responses from the two RPC clients.
547    pub async fn compare_replay_transaction_responses(
548        &self,
549        transaction_hashes: Vec<TxHash>,
550        trace_types: HashSet<TraceType>,
551    ) {
552        let stream1 =
553            self.client1.replay_transactions(transaction_hashes.clone(), trace_types.clone());
554        let stream2 = self.client2.replay_transactions(transaction_hashes, trace_types);
555
556        let mut zipped_streams = stream1.zip(stream2);
557
558        while let Some((result1, result2)) = zipped_streams.next().await {
559            match (result1, result2) {
560                (Ok((ref trace1_data, ref tx_hash1)), Ok((ref trace2_data, ref tx_hash2))) => {
561                    similar_asserts::assert_eq!(
562                        trace1_data,
563                        trace2_data,
564                        "Mismatch in trace results for transaction: {tx_hash1:?}",
565                    );
566                    assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
567                }
568                (Err((ref err1, ref tx_hash1)), Err((ref err2, ref tx_hash2))) => {
569                    assert_eq!(
570                        format!("{err1:?}"),
571                        format!("{err2:?}"),
572                        "Different errors for transaction: {tx_hash1:?}",
573                    );
574                    assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
575                }
576                _ => panic!("One client returned Ok while the other returned Err."),
577            }
578        }
579    }
580}
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use alloy_eips::BlockNumberOrTag;
585    use alloy_rpc_types_trace::filter::TraceFilterMode;
586    use futures::future::join;
587    use jsonrpsee::http_client::HttpClientBuilder;
588
589    const fn assert_is_stream<St: Stream>(_: &St) {}
590
591    #[tokio::test]
592    async fn can_create_block_stream() {
593        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
594        let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()];
595        let stream = client.trace_block_buffered(block, 2);
596        assert_is_stream(&stream);
597    }
598
599    #[tokio::test]
600    #[ignore]
601    async fn can_create_replay_transaction_stream() {
602        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
603
604        // Assuming you have some transactions you want to test, replace with actual hashes.
605        let transactions = vec![
606            "0x4e08fe36db723a338e852f89f613e606b0c9a17e649b18b01251f86236a2cef3".parse().unwrap(),
607            "0xea2817f1aeeb587b82f4ab87a6dbd3560fc35ed28de1be280cb40b2a24ab48bb".parse().unwrap(),
608        ];
609
610        let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
611
612        let mut stream = client.replay_transactions(transactions, trace_types);
613        let mut successes = 0;
614        let mut failures = 0;
615
616        assert_is_stream(&stream);
617
618        while let Some(result) = stream.next().await {
619            match result {
620                Ok((trace_result, tx_hash)) => {
621                    println!("Success for tx_hash {tx_hash:?}: {trace_result:?}");
622                    successes += 1;
623                }
624                Err((error, tx_hash)) => {
625                    println!("Error for tx_hash {tx_hash:?}: {error:?}");
626                    failures += 1;
627                }
628            }
629        }
630
631        println!("Total successes: {successes}");
632        println!("Total failures: {failures}");
633    }
634
635    #[tokio::test]
636    #[ignore]
637    async fn can_create_trace_call_many_stream() {
638        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
639
640        let call_request_1 = TransactionRequest::default();
641        let call_request_2 = TransactionRequest::default();
642        let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
643        let calls = vec![(call_request_1, trace_types.clone()), (call_request_2, trace_types)];
644
645        let mut stream = client.trace_call_many_stream(calls, None);
646
647        assert_is_stream(&stream);
648
649        while let Some(result) = stream.next().await {
650            match result {
651                Ok(trace_result) => {
652                    println!("Success: {trace_result:?}");
653                }
654                Err(error) => {
655                    println!("Error: {error:?}");
656                }
657            }
658        }
659    }
660    #[tokio::test]
661    #[ignore]
662    async fn can_create_trace_get_stream() {
663        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
664
665        let tx_hash: B256 = "".parse().unwrap();
666
667        let indices: Vec<Index> = vec![Index::from(0)];
668
669        let mut stream = client.trace_get_stream(tx_hash, indices);
670
671        while let Some(result) = stream.next().await {
672            match result {
673                Ok(trace) => {
674                    println!("Received trace: {trace:?}");
675                }
676                Err(e) => {
677                    println!("Error fetching trace: {e:?}");
678                }
679            }
680        }
681    }
682
683    #[tokio::test]
684    #[ignore]
685    async fn can_create_trace_filter() {
686        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
687
688        let filter = TraceFilter {
689            from_block: None,
690            to_block: None,
691            from_address: Vec::new(),
692            to_address: Vec::new(),
693            mode: TraceFilterMode::Union,
694            after: None,
695            count: None,
696        };
697
698        let filters = vec![filter];
699        let mut stream = client.trace_filter_stream(filters);
700
701        while let Some(result) = stream.next().await {
702            match result {
703                Ok(trace) => {
704                    println!("Received trace: {trace:?}");
705                }
706                Err(e) => {
707                    println!("Error fetching trace: {e:?}");
708                }
709            }
710        }
711    }
712
713    #[tokio::test]
714    #[ignore]
715    async fn can_create_trace_call_stream() {
716        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
717
718        let trace_call_request = TraceCallRequest::default();
719
720        let mut stream = client.trace_call_stream(trace_call_request);
721        let mut successes = 0;
722        let mut failures = 0;
723
724        assert_is_stream(&stream);
725
726        while let Some(result) = stream.next().await {
727            match result {
728                Ok(trace_result) => {
729                    println!("Success: {trace_result:?}");
730                    successes += 1;
731                }
732                Err((error, request)) => {
733                    println!("Error for request {request:?}: {error:?}");
734                    failures += 1;
735                }
736            }
737        }
738
739        println!("Total successes: {successes}");
740        println!("Total failures: {failures}");
741    }
742
743    #[tokio::test]
744    #[ignore]
745    async fn block_opcode_gas_stream() {
746        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
747        let block = vec![BlockNumberOrTag::Latest];
748        let mut stream = client.trace_block_opcode_gas_unordered(block, 2);
749        assert_is_stream(&stream);
750        let _opcodes = stream.next().await.unwrap();
751    }
752
753    #[tokio::test(flavor = "multi_thread")]
754    #[ignore]
755    async fn compare_block_stream() {
756        let client_a = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
757        let client_b = HttpClientBuilder::default().build("http://localhost:8544").unwrap();
758        let blocks = 0u64..=1681464;
759        let mut stream_a = client_a.trace_block_buffered(blocks.clone(), 2);
760        let mut stream_b = client_b.trace_block_buffered(blocks, 2);
761
762        let mut count = 0;
763        loop {
764            let (res_a, res_b) = join(stream_a.next(), stream_b.next()).await;
765
766            if res_a.is_none() && res_b.is_none() {
767                break;
768            }
769
770            match (res_a, res_b) {
771                (Some(Ok(res_a)), Some(Ok(res_b))) => {
772                    if res_a != res_b {
773                        println!("Received different trace results: {res_a:?}, res_b: {res_b:?}");
774                    }
775                }
776                (res_a, res_b) => {
777                    println!("Received different responses: {res_a:?}, res_b: {res_b:?}");
778                }
779            }
780
781            if count % 1000 == 0 {
782                println!("Blocks traced: {count}");
783            }
784
785            count += 1;
786        }
787        println!("Total blocks traced: {count}");
788    }
789}