reth_rpc_api_testing_util/
trace.rs

1//! Helpers for testing trace calls.
2
3use alloy_eips::BlockId;
4use alloy_primitives::{map::HashSet, Bytes, TxHash, B256};
5use alloy_rpc_types_eth::{transaction::TransactionRequest, Index};
6use alloy_rpc_types_trace::{
7    filter::TraceFilter,
8    opcode::BlockOpcodeGas,
9    parity::{LocalizedTransactionTrace, TraceResults, TraceType},
10    tracerequest::TraceCallRequest,
11};
12use futures::{Stream, StreamExt};
13use jsonrpsee::core::client::Error as RpcError;
14use reth_rpc_api::clients::TraceApiClient;
15use std::{
16    pin::Pin,
17    task::{Context, Poll},
18};
19
20/// A type alias that represents the result of a raw transaction trace stream.
21type RawTransactionTraceResult<'a> =
22    Pin<Box<dyn Stream<Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>> + 'a>>;
23
24/// A result type for the `trace_block` method that also captures the requested block.
25pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
26
27/// A result type for the `trace_blockOpcodeGas` method that also captures the requested block.
28pub type TraceBlockOpCodeGasResult = Result<(BlockOpcodeGas, BlockId), (RpcError, BlockId)>;
29
30/// Type alias representing the result of replaying a transaction.
31pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
32
33/// A type representing the result of calling `trace_call_many` method.
34pub type CallManyTraceResult = Result<
35    (Vec<TraceResults>, Vec<(TransactionRequest, HashSet<TraceType>)>),
36    (RpcError, Vec<(TransactionRequest, HashSet<TraceType>)>),
37>;
38
39/// Result type for the `trace_get` method that also captures the requested transaction hash and
40/// index.
41pub type TraceGetResult =
42    Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
43
44/// Represents a result type for the `trace_filter` stream extension.
45pub type TraceFilterResult =
46    Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;
47
48/// Represents the result of a single trace call.
49pub type TraceCallResult = Result<TraceResults, (RpcError, TraceCallRequest)>;
50
51/// An extension trait for the Trace API.
52pub trait TraceApiExt {
53    /// The provider type that is used to make the requests.
54    type Provider;
55
56    /// Returns a new stream that yields the traces for the given blocks.
57    ///
58    /// See also [`StreamExt::buffered`].
59    fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
60    where
61        I: IntoIterator<Item = B>,
62        B: Into<BlockId>;
63
64    /// Returns a new stream that yields the traces for the given blocks.
65    ///
66    /// See also [`StreamExt::buffer_unordered`].
67    fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
68    where
69        I: IntoIterator<Item = B>,
70        B: Into<BlockId>;
71
72    /// Returns a new stream that yields the traces the opcodes for the given blocks.
73    ///
74    /// See also [`StreamExt::buffered`].
75    fn trace_block_opcode_gas_unordered<I, B>(
76        &self,
77        params: I,
78        n: usize,
79    ) -> TraceBlockOpcodeGasStream<'_>
80    where
81        I: IntoIterator<Item = B>,
82        B: Into<BlockId>;
83
84    /// Returns a new stream that replays the transactions for the given transaction hashes.
85    ///
86    /// This returns all results in order.
87    fn replay_transactions<I>(
88        &self,
89        tx_hashes: I,
90        trace_types: HashSet<TraceType>,
91    ) -> ReplayTransactionStream<'_>
92    where
93        I: IntoIterator<Item = TxHash>;
94
95    /// Returns a new stream that traces the provided raw transaction data.
96    fn trace_raw_transaction_stream(
97        &self,
98        data: Bytes,
99        trace_types: HashSet<TraceType>,
100        block_id: Option<BlockId>,
101    ) -> RawTransactionTraceStream<'_>;
102
103    /// Creates a stream of results for multiple dependent transaction calls on top of the same
104    /// block.
105    fn trace_call_many_stream<I>(
106        &self,
107        calls: I,
108        block_id: Option<BlockId>,
109    ) -> CallManyTraceStream<'_>
110    where
111        I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>;
112
113    /// Returns a new stream that yields the traces for the given transaction hash and indices.
114    fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
115    where
116        I: IntoIterator<Item = Index>;
117
118    /// Returns a new stream that yields traces for given filters.
119    fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
120    where
121        I: IntoIterator<Item = TraceFilter>;
122
123    /// Returns a new stream that yields the trace results for the given call requests.
124    fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_>;
125}
126/// `TraceCallStream` provides an asynchronous stream of tracing results.
127#[must_use = "streams do nothing unless polled"]
128pub struct TraceCallStream<'a> {
129    stream: Pin<Box<dyn Stream<Item = TraceCallResult> + 'a>>,
130}
131
132impl Stream for TraceCallStream<'_> {
133    type Item = TraceCallResult;
134
135    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        self.stream.as_mut().poll_next(cx)
137    }
138}
139
140impl std::fmt::Debug for TraceCallStream<'_> {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("TraceCallStream").finish()
143    }
144}
145
146/// Represents a stream that asynchronously yields the results of the `trace_filter` method.
147#[must_use = "streams do nothing unless polled"]
148pub struct TraceFilterStream<'a> {
149    stream: Pin<Box<dyn Stream<Item = TraceFilterResult> + 'a>>,
150}
151
152impl Stream for TraceFilterStream<'_> {
153    type Item = TraceFilterResult;
154
155    /// Attempts to pull out the next value of the stream.
156    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157        self.stream.as_mut().poll_next(cx)
158    }
159}
160
161impl std::fmt::Debug for TraceFilterStream<'_> {
162    /// Provides a debug representation of the `TraceFilterStream`.
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("TraceFilterStream").finish_non_exhaustive()
165    }
166}
167
168/// A stream that asynchronously yields the results of the `trace_get` method for a given
169/// transaction hash and a series of indices.
170#[must_use = "streams do nothing unless polled"]
171pub struct TraceGetStream<'a> {
172    stream: Pin<Box<dyn Stream<Item = TraceGetResult> + 'a>>,
173}
174
175impl Stream for TraceGetStream<'_> {
176    type Item = TraceGetResult;
177
178    /// Attempts to pull out the next item of the stream
179    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180        self.stream.as_mut().poll_next(cx)
181    }
182}
183
184impl std::fmt::Debug for TraceGetStream<'_> {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("TraceGetStream").finish_non_exhaustive()
187    }
188}
189
190/// A stream that provides asynchronous iteration over results from the `trace_call_many` function.
191///
192/// The stream yields items of type `CallManyTraceResult`.
193#[must_use = "streams do nothing unless polled"]
194pub struct CallManyTraceStream<'a> {
195    stream: Pin<Box<dyn Stream<Item = CallManyTraceResult> + 'a>>,
196}
197
198impl Stream for CallManyTraceStream<'_> {
199    type Item = CallManyTraceResult;
200
201    /// Polls for the next item from the stream.
202    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203        self.stream.as_mut().poll_next(cx)
204    }
205}
206
207impl std::fmt::Debug for CallManyTraceStream<'_> {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        f.debug_struct("CallManyTraceStream").finish()
210    }
211}
212
213/// A stream that traces the provided raw transaction data.
214#[must_use = "streams do nothing unless polled"]
215pub struct RawTransactionTraceStream<'a> {
216    stream: RawTransactionTraceResult<'a>,
217}
218
219impl Stream for RawTransactionTraceStream<'_> {
220    type Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>;
221
222    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223        self.stream.as_mut().poll_next(cx)
224    }
225}
226
227impl std::fmt::Debug for RawTransactionTraceStream<'_> {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        f.debug_struct("RawTransactionTraceStream").finish()
230    }
231}
232
233/// A stream that replays the transactions for the requested hashes.
234#[must_use = "streams do nothing unless polled"]
235pub struct ReplayTransactionStream<'a> {
236    stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
237}
238
239impl Stream for ReplayTransactionStream<'_> {
240    type Item = ReplayTransactionResult;
241
242    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243        self.stream.as_mut().poll_next(cx)
244    }
245}
246
247impl std::fmt::Debug for ReplayTransactionStream<'_> {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        f.debug_struct("ReplayTransactionStream").finish()
250    }
251}
252
253impl<T: TraceApiClient + Sync> TraceApiExt for T {
254    type Provider = T;
255
256    fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
257    where
258        I: IntoIterator<Item = B>,
259        B: Into<BlockId>,
260    {
261        let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
262        let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
263            match self.trace_block(block).await {
264                Ok(result) => Ok((result.unwrap_or_default(), block)),
265                Err(err) => Err((err, block)),
266            }
267        }))
268        .buffered(n);
269        TraceBlockStream { stream: Box::pin(stream) }
270    }
271
272    fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
273    where
274        I: IntoIterator<Item = B>,
275        B: Into<BlockId>,
276    {
277        let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
278        let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
279            match self.trace_block(block).await {
280                Ok(result) => Ok((result.unwrap_or_default(), block)),
281                Err(err) => Err((err, block)),
282            }
283        }))
284        .buffer_unordered(n);
285        TraceBlockStream { stream: Box::pin(stream) }
286    }
287
288    fn trace_block_opcode_gas_unordered<I, B>(
289        &self,
290        params: I,
291        n: usize,
292    ) -> TraceBlockOpcodeGasStream<'_>
293    where
294        I: IntoIterator<Item = B>,
295        B: Into<BlockId>,
296    {
297        let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
298        let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
299            match self.trace_block_opcode_gas(block).await {
300                Ok(result) => Ok((result.unwrap(), block)),
301                Err(err) => Err((err, block)),
302            }
303        }))
304        .buffered(n);
305        TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
306    }
307
308    fn replay_transactions<I>(
309        &self,
310        tx_hashes: I,
311        trace_types: HashSet<TraceType>,
312    ) -> ReplayTransactionStream<'_>
313    where
314        I: IntoIterator<Item = TxHash>,
315    {
316        let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
317        let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
318            let trace_types_clone = trace_types.clone(); // Clone outside of the async block
319            async move {
320                match self.replay_transaction(hash, trace_types_clone).await {
321                    Ok(result) => Ok((result, hash)),
322                    Err(err) => Err((err, hash)),
323                }
324            }
325        }))
326        .buffered(10);
327        ReplayTransactionStream { stream: Box::pin(stream) }
328    }
329
330    fn trace_raw_transaction_stream(
331        &self,
332        data: Bytes,
333        trace_types: HashSet<TraceType>,
334        block_id: Option<BlockId>,
335    ) -> RawTransactionTraceStream<'_> {
336        let stream = futures::stream::once(async move {
337            match self.trace_raw_transaction(data.clone(), trace_types, block_id).await {
338                Ok(result) => Ok((result, data)),
339                Err(err) => Err((err, data)),
340            }
341        });
342        RawTransactionTraceStream { stream: Box::pin(stream) }
343    }
344
345    fn trace_call_many_stream<I>(
346        &self,
347        calls: I,
348        block_id: Option<BlockId>,
349    ) -> CallManyTraceStream<'_>
350    where
351        I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>,
352    {
353        let call_set = calls.into_iter().collect::<Vec<_>>();
354        let stream = futures::stream::once(async move {
355            match self.trace_call_many(call_set.clone(), block_id).await {
356                Ok(results) => Ok((results, call_set)),
357                Err(err) => Err((err, call_set)),
358            }
359        });
360        CallManyTraceStream { stream: Box::pin(stream) }
361    }
362
363    fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
364    where
365        I: IntoIterator<Item = Index>,
366    {
367        let index_list = indices.into_iter().collect::<Vec<_>>();
368        let stream = futures::stream::iter(index_list.into_iter().map(move |index| async move {
369            match self.trace_get(hash, vec![index]).await {
370                Ok(result) => Ok((result, hash, vec![index])),
371                Err(err) => Err((err, hash, vec![index])),
372            }
373        }))
374        .buffered(10);
375        TraceGetStream { stream: Box::pin(stream) }
376    }
377
378    fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
379    where
380        I: IntoIterator<Item = TraceFilter>,
381    {
382        let filter_list = filters.into_iter().collect::<Vec<_>>();
383        let stream = futures::stream::iter(filter_list.into_iter().map(move |filter| async move {
384            match self.trace_filter(filter.clone()).await {
385                Ok(result) => Ok((result, filter)),
386                Err(err) => Err((err, filter)),
387            }
388        }))
389        .buffered(10);
390        TraceFilterStream { stream: Box::pin(stream) }
391    }
392
393    fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_> {
394        let stream = futures::stream::once(async move {
395            match self
396                .trace_call(
397                    request.call.clone(),
398                    request.trace_types.clone(),
399                    request.block_id,
400                    request.state_overrides.clone(),
401                    request.block_overrides.clone(),
402                )
403                .await
404            {
405                Ok(result) => Ok(result),
406                Err(err) => Err((err, request)),
407            }
408        });
409        TraceCallStream { stream: Box::pin(stream) }
410    }
411}
412
413/// A stream that yields the traces for the requested blocks.
414#[must_use = "streams do nothing unless polled"]
415pub struct TraceBlockStream<'a> {
416    stream: Pin<Box<dyn Stream<Item = TraceBlockResult> + 'a>>,
417}
418
419impl TraceBlockStream<'_> {
420    /// Returns the next error result of the stream.
421    pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
422        loop {
423            match self.next().await? {
424                Ok(_) => {}
425                Err(err) => return Some(err),
426            }
427        }
428    }
429}
430
431impl Stream for TraceBlockStream<'_> {
432    type Item = TraceBlockResult;
433
434    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
435        self.stream.as_mut().poll_next(cx)
436    }
437}
438
439impl std::fmt::Debug for TraceBlockStream<'_> {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        f.debug_struct("TraceBlockStream").finish_non_exhaustive()
442    }
443}
444
445/// A stream that yields the opcodes for the requested blocks.
446#[must_use = "streams do nothing unless polled"]
447pub struct TraceBlockOpcodeGasStream<'a> {
448    stream: Pin<Box<dyn Stream<Item = TraceBlockOpCodeGasResult> + 'a>>,
449}
450
451impl TraceBlockOpcodeGasStream<'_> {
452    /// Returns the next error result of the stream.
453    pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
454        loop {
455            match self.next().await? {
456                Ok(_) => {}
457                Err(err) => return Some(err),
458            }
459        }
460    }
461}
462
463impl Stream for TraceBlockOpcodeGasStream<'_> {
464    type Item = TraceBlockOpCodeGasResult;
465
466    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
467        self.stream.as_mut().poll_next(cx)
468    }
469}
470
471impl std::fmt::Debug for TraceBlockOpcodeGasStream<'_> {
472    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473        f.debug_struct("TraceBlockOpcodeGasStream").finish_non_exhaustive()
474    }
475}
476
477/// A utility to compare RPC responses from two different clients.
478///
479/// The `RpcComparer` is designed to perform comparisons between two RPC clients.
480/// It is useful in scenarios where there's a need to ensure that two different RPC clients
481/// return consistent responses. This can be particularly valuable in testing environments
482/// where one might want to compare a test client's responses against a production client
483/// or compare two different Ethereum client implementations.
484#[derive(Debug)]
485pub struct RpcComparer<C1, C2>
486where
487    C1: TraceApiExt,
488    C2: TraceApiExt,
489{
490    client1: C1,
491    client2: C2,
492}
493impl<C1, C2> RpcComparer<C1, C2>
494where
495    C1: TraceApiExt,
496    C2: TraceApiExt,
497{
498    /// Constructs a new `RpcComparer`.
499    ///
500    /// Initializes the comparer with two clients that will be used for fetching
501    /// and comparison.
502    ///
503    /// # Arguments
504    ///
505    /// * `client1` - The first RPC client.
506    /// * `client2` - The second RPC client.
507    pub const fn new(client1: C1, client2: C2) -> Self {
508        Self { client1, client2 }
509    }
510
511    /// Compares the `trace_block` responses from the two RPC clients.
512    ///
513    /// Fetches the `trace_block` responses for the provided block IDs from both clients
514    /// and compares them. If there are inconsistencies between the two responses, this
515    /// method will panic with a relevant message indicating the difference.
516    pub async fn compare_trace_block_responses(&self, block_ids: Vec<BlockId>) {
517        let stream1 = self.client1.trace_block_buffered(block_ids.clone(), 2);
518        let stream2 = self.client2.trace_block_buffered(block_ids, 2);
519
520        let mut zipped_streams = stream1.zip(stream2);
521
522        while let Some((result1, result2)) = zipped_streams.next().await {
523            match (result1, result2) {
524                (Ok((ref traces1_data, ref block1)), Ok((ref traces2_data, ref block2))) => {
525                    similar_asserts::assert_eq!(
526                        traces1_data,
527                        traces2_data,
528                        "Mismatch in traces for block: {:?}",
529                        block1
530                    );
531                    assert_eq!(block1, block2, "Mismatch in block ids.");
532                }
533                (Err((ref err1, ref block1)), Err((ref err2, ref block2))) => {
534                    assert_eq!(
535                        format!("{err1:?}"),
536                        format!("{err2:?}"),
537                        "Different errors for block: {block1:?}"
538                    );
539                    assert_eq!(block1, block2, "Mismatch in block ids.");
540                }
541                _ => panic!("One client returned Ok while the other returned Err."),
542            }
543        }
544    }
545
546    /// Compares the `replay_transactions` responses from the two RPC clients.
547    pub async fn compare_replay_transaction_responses(
548        &self,
549        transaction_hashes: Vec<TxHash>,
550        trace_types: HashSet<TraceType>,
551    ) {
552        let stream1 =
553            self.client1.replay_transactions(transaction_hashes.clone(), trace_types.clone());
554        let stream2 = self.client2.replay_transactions(transaction_hashes, trace_types);
555
556        let mut zipped_streams = stream1.zip(stream2);
557
558        while let Some((result1, result2)) = zipped_streams.next().await {
559            match (result1, result2) {
560                (Ok((ref trace1_data, ref tx_hash1)), Ok((ref trace2_data, ref tx_hash2))) => {
561                    similar_asserts::assert_eq!(
562                        trace1_data,
563                        trace2_data,
564                        "Mismatch in trace results for transaction: {tx_hash1:?}",
565                    );
566                    assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
567                }
568                (Err((ref err1, ref tx_hash1)), Err((ref err2, ref tx_hash2))) => {
569                    assert_eq!(
570                        format!("{err1:?}"),
571                        format!("{err2:?}"),
572                        "Different errors for transaction: {tx_hash1:?}",
573                    );
574                    assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
575                }
576                _ => panic!("One client returned Ok while the other returned Err."),
577            }
578        }
579    }
580}
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use alloy_eips::BlockNumberOrTag;
585    use alloy_rpc_types_trace::filter::TraceFilterMode;
586    use jsonrpsee::http_client::HttpClientBuilder;
587
588    const fn assert_is_stream<St: Stream>(_: &St) {}
589
590    #[tokio::test]
591    async fn can_create_block_stream() {
592        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
593        let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()];
594        let stream = client.trace_block_buffered(block, 2);
595        assert_is_stream(&stream);
596    }
597
598    #[tokio::test]
599    #[ignore]
600    async fn can_create_replay_transaction_stream() {
601        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
602
603        // Assuming you have some transactions you want to test, replace with actual hashes.
604        let transactions = vec![
605            "0x4e08fe36db723a338e852f89f613e606b0c9a17e649b18b01251f86236a2cef3".parse().unwrap(),
606            "0xea2817f1aeeb587b82f4ab87a6dbd3560fc35ed28de1be280cb40b2a24ab48bb".parse().unwrap(),
607        ];
608
609        let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
610
611        let mut stream = client.replay_transactions(transactions, trace_types);
612        let mut successes = 0;
613        let mut failures = 0;
614
615        assert_is_stream(&stream);
616
617        while let Some(result) = stream.next().await {
618            match result {
619                Ok((trace_result, tx_hash)) => {
620                    println!("Success for tx_hash {tx_hash:?}: {trace_result:?}");
621                    successes += 1;
622                }
623                Err((error, tx_hash)) => {
624                    println!("Error for tx_hash {tx_hash:?}: {error:?}");
625                    failures += 1;
626                }
627            }
628        }
629
630        println!("Total successes: {successes}");
631        println!("Total failures: {failures}");
632    }
633
634    #[tokio::test]
635    #[ignore]
636    async fn can_create_trace_call_many_stream() {
637        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
638
639        let call_request_1 = TransactionRequest::default();
640        let call_request_2 = TransactionRequest::default();
641        let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
642        let calls = vec![(call_request_1, trace_types.clone()), (call_request_2, trace_types)];
643
644        let mut stream = client.trace_call_many_stream(calls, None);
645
646        assert_is_stream(&stream);
647
648        while let Some(result) = stream.next().await {
649            match result {
650                Ok(trace_result) => {
651                    println!("Success: {trace_result:?}");
652                }
653                Err(error) => {
654                    println!("Error: {error:?}");
655                }
656            }
657        }
658    }
659    #[tokio::test]
660    #[ignore]
661    async fn can_create_trace_get_stream() {
662        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
663
664        let tx_hash: B256 = "".parse().unwrap();
665
666        let indices: Vec<Index> = vec![Index::from(0)];
667
668        let mut stream = client.trace_get_stream(tx_hash, indices);
669
670        while let Some(result) = stream.next().await {
671            match result {
672                Ok(trace) => {
673                    println!("Received trace: {trace:?}");
674                }
675                Err(e) => {
676                    println!("Error fetching trace: {e:?}");
677                }
678            }
679        }
680    }
681
682    #[tokio::test]
683    #[ignore]
684    async fn can_create_trace_filter() {
685        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
686
687        let filter = TraceFilter {
688            from_block: None,
689            to_block: None,
690            from_address: Vec::new(),
691            to_address: Vec::new(),
692            mode: TraceFilterMode::Union,
693            after: None,
694            count: None,
695        };
696
697        let filters = vec![filter];
698        let mut stream = client.trace_filter_stream(filters);
699
700        while let Some(result) = stream.next().await {
701            match result {
702                Ok(trace) => {
703                    println!("Received trace: {trace:?}");
704                }
705                Err(e) => {
706                    println!("Error fetching trace: {e:?}");
707                }
708            }
709        }
710    }
711
712    #[tokio::test]
713    #[ignore]
714    async fn can_create_trace_call_stream() {
715        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
716
717        let trace_call_request = TraceCallRequest::default();
718
719        let mut stream = client.trace_call_stream(trace_call_request);
720        let mut successes = 0;
721        let mut failures = 0;
722
723        assert_is_stream(&stream);
724
725        while let Some(result) = stream.next().await {
726            match result {
727                Ok(trace_result) => {
728                    println!("Success: {trace_result:?}");
729                    successes += 1;
730                }
731                Err((error, request)) => {
732                    println!("Error for request {request:?}: {error:?}");
733                    failures += 1;
734                }
735            }
736        }
737
738        println!("Total successes: {successes}");
739        println!("Total failures: {failures}");
740    }
741
742    #[tokio::test]
743    #[ignore]
744    async fn block_opcode_gas_stream() {
745        let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
746        let block = vec![BlockNumberOrTag::Latest];
747        let mut stream = client.trace_block_opcode_gas_unordered(block, 2);
748        assert_is_stream(&stream);
749        let _opcodes = stream.next().await.unwrap();
750    }
751}