1use alloy_eips::BlockId;
4use alloy_primitives::{map::HashSet, Bytes, TxHash, B256};
5use alloy_rpc_types_eth::{transaction::TransactionRequest, Index};
6use alloy_rpc_types_trace::{
7 filter::TraceFilter,
8 opcode::BlockOpcodeGas,
9 parity::{LocalizedTransactionTrace, TraceResults, TraceType},
10 tracerequest::TraceCallRequest,
11};
12use futures::{Stream, StreamExt};
13use jsonrpsee::core::client::Error as RpcError;
14use reth_rpc_api::clients::TraceApiClient;
15use std::{
16 pin::Pin,
17 task::{Context, Poll},
18};
19
20type RawTransactionTraceResult<'a> =
22 Pin<Box<dyn Stream<Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>> + 'a>>;
23
24pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
26
27pub type TraceBlockOpCodeGasResult = Result<(BlockOpcodeGas, BlockId), (RpcError, BlockId)>;
29
30pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
32
33pub type CallManyTraceResult = Result<
35 (Vec<TraceResults>, Vec<(TransactionRequest, HashSet<TraceType>)>),
36 (RpcError, Vec<(TransactionRequest, HashSet<TraceType>)>),
37>;
38
39pub type TraceGetResult =
42 Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
43
44pub type TraceFilterResult =
46 Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;
47
48pub type TraceCallResult = Result<TraceResults, (RpcError, TraceCallRequest)>;
50
51pub trait TraceApiExt {
53 type Provider;
55
56 fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
60 where
61 I: IntoIterator<Item = B>,
62 B: Into<BlockId>;
63
64 fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
68 where
69 I: IntoIterator<Item = B>,
70 B: Into<BlockId>;
71
72 fn trace_block_opcode_gas_unordered<I, B>(
76 &self,
77 params: I,
78 n: usize,
79 ) -> TraceBlockOpcodeGasStream<'_>
80 where
81 I: IntoIterator<Item = B>,
82 B: Into<BlockId>;
83
84 fn replay_transactions<I>(
88 &self,
89 tx_hashes: I,
90 trace_types: HashSet<TraceType>,
91 ) -> ReplayTransactionStream<'_>
92 where
93 I: IntoIterator<Item = TxHash>;
94
95 fn trace_raw_transaction_stream(
97 &self,
98 data: Bytes,
99 trace_types: HashSet<TraceType>,
100 block_id: Option<BlockId>,
101 ) -> RawTransactionTraceStream<'_>;
102
103 fn trace_call_many_stream<I>(
106 &self,
107 calls: I,
108 block_id: Option<BlockId>,
109 ) -> CallManyTraceStream<'_>
110 where
111 I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>;
112
113 fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
115 where
116 I: IntoIterator<Item = Index>;
117
118 fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
120 where
121 I: IntoIterator<Item = TraceFilter>;
122
123 fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_>;
125}
126#[must_use = "streams do nothing unless polled"]
128pub struct TraceCallStream<'a> {
129 stream: Pin<Box<dyn Stream<Item = TraceCallResult> + 'a>>,
130}
131
132impl Stream for TraceCallStream<'_> {
133 type Item = TraceCallResult;
134
135 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 self.stream.as_mut().poll_next(cx)
137 }
138}
139
140impl std::fmt::Debug for TraceCallStream<'_> {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("TraceCallStream").finish()
143 }
144}
145
146#[must_use = "streams do nothing unless polled"]
148pub struct TraceFilterStream<'a> {
149 stream: Pin<Box<dyn Stream<Item = TraceFilterResult> + 'a>>,
150}
151
152impl Stream for TraceFilterStream<'_> {
153 type Item = TraceFilterResult;
154
155 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157 self.stream.as_mut().poll_next(cx)
158 }
159}
160
161impl std::fmt::Debug for TraceFilterStream<'_> {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("TraceFilterStream").finish_non_exhaustive()
165 }
166}
167
168#[must_use = "streams do nothing unless polled"]
171pub struct TraceGetStream<'a> {
172 stream: Pin<Box<dyn Stream<Item = TraceGetResult> + 'a>>,
173}
174
175impl Stream for TraceGetStream<'_> {
176 type Item = TraceGetResult;
177
178 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
180 self.stream.as_mut().poll_next(cx)
181 }
182}
183
184impl std::fmt::Debug for TraceGetStream<'_> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct("TraceGetStream").finish_non_exhaustive()
187 }
188}
189
190#[must_use = "streams do nothing unless polled"]
194pub struct CallManyTraceStream<'a> {
195 stream: Pin<Box<dyn Stream<Item = CallManyTraceResult> + 'a>>,
196}
197
198impl Stream for CallManyTraceStream<'_> {
199 type Item = CallManyTraceResult;
200
201 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203 self.stream.as_mut().poll_next(cx)
204 }
205}
206
207impl std::fmt::Debug for CallManyTraceStream<'_> {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 f.debug_struct("CallManyTraceStream").finish()
210 }
211}
212
213#[must_use = "streams do nothing unless polled"]
215pub struct RawTransactionTraceStream<'a> {
216 stream: RawTransactionTraceResult<'a>,
217}
218
219impl Stream for RawTransactionTraceStream<'_> {
220 type Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>;
221
222 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
223 self.stream.as_mut().poll_next(cx)
224 }
225}
226
227impl std::fmt::Debug for RawTransactionTraceStream<'_> {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.debug_struct("RawTransactionTraceStream").finish()
230 }
231}
232
233#[must_use = "streams do nothing unless polled"]
235pub struct ReplayTransactionStream<'a> {
236 stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
237}
238
239impl Stream for ReplayTransactionStream<'_> {
240 type Item = ReplayTransactionResult;
241
242 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243 self.stream.as_mut().poll_next(cx)
244 }
245}
246
247impl std::fmt::Debug for ReplayTransactionStream<'_> {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 f.debug_struct("ReplayTransactionStream").finish()
250 }
251}
252
253impl<T: TraceApiClient<TransactionRequest> + Sync> TraceApiExt for T {
254 type Provider = T;
255
256 fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
257 where
258 I: IntoIterator<Item = B>,
259 B: Into<BlockId>,
260 {
261 let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
262 let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
263 match self.trace_block(block).await {
264 Ok(result) => Ok((result.unwrap_or_default(), block)),
265 Err(err) => Err((err, block)),
266 }
267 }))
268 .buffered(n);
269 TraceBlockStream { stream: Box::pin(stream) }
270 }
271
272 fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
273 where
274 I: IntoIterator<Item = B>,
275 B: Into<BlockId>,
276 {
277 let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
278 let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
279 match self.trace_block(block).await {
280 Ok(result) => Ok((result.unwrap_or_default(), block)),
281 Err(err) => Err((err, block)),
282 }
283 }))
284 .buffer_unordered(n);
285 TraceBlockStream { stream: Box::pin(stream) }
286 }
287
288 fn trace_block_opcode_gas_unordered<I, B>(
289 &self,
290 params: I,
291 n: usize,
292 ) -> TraceBlockOpcodeGasStream<'_>
293 where
294 I: IntoIterator<Item = B>,
295 B: Into<BlockId>,
296 {
297 let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
298 let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
299 match self.trace_block_opcode_gas(block).await {
300 Ok(result) => Ok((result.unwrap(), block)),
301 Err(err) => Err((err, block)),
302 }
303 }))
304 .buffered(n);
305 TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
306 }
307
308 fn replay_transactions<I>(
309 &self,
310 tx_hashes: I,
311 trace_types: HashSet<TraceType>,
312 ) -> ReplayTransactionStream<'_>
313 where
314 I: IntoIterator<Item = TxHash>,
315 {
316 let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
317 let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
318 let trace_types_clone = trace_types.clone(); async move {
320 match self.replay_transaction(hash, trace_types_clone).await {
321 Ok(result) => Ok((result, hash)),
322 Err(err) => Err((err, hash)),
323 }
324 }
325 }))
326 .buffered(10);
327 ReplayTransactionStream { stream: Box::pin(stream) }
328 }
329
330 fn trace_raw_transaction_stream(
331 &self,
332 data: Bytes,
333 trace_types: HashSet<TraceType>,
334 block_id: Option<BlockId>,
335 ) -> RawTransactionTraceStream<'_> {
336 let stream = futures::stream::once(async move {
337 match self.trace_raw_transaction(data.clone(), trace_types, block_id).await {
338 Ok(result) => Ok((result, data)),
339 Err(err) => Err((err, data)),
340 }
341 });
342 RawTransactionTraceStream { stream: Box::pin(stream) }
343 }
344
345 fn trace_call_many_stream<I>(
346 &self,
347 calls: I,
348 block_id: Option<BlockId>,
349 ) -> CallManyTraceStream<'_>
350 where
351 I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>,
352 {
353 let call_set = calls.into_iter().collect::<Vec<_>>();
354 let stream = futures::stream::once(async move {
355 match self.trace_call_many(call_set.clone(), block_id).await {
356 Ok(results) => Ok((results, call_set)),
357 Err(err) => Err((err, call_set)),
358 }
359 });
360 CallManyTraceStream { stream: Box::pin(stream) }
361 }
362
363 fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
364 where
365 I: IntoIterator<Item = Index>,
366 {
367 let index_list = indices.into_iter().collect::<Vec<_>>();
368 let stream = futures::stream::iter(index_list.into_iter().map(move |index| async move {
369 match self.trace_get(hash, vec![index]).await {
370 Ok(result) => Ok((result, hash, vec![index])),
371 Err(err) => Err((err, hash, vec![index])),
372 }
373 }))
374 .buffered(10);
375 TraceGetStream { stream: Box::pin(stream) }
376 }
377
378 fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
379 where
380 I: IntoIterator<Item = TraceFilter>,
381 {
382 let filter_list = filters.into_iter().collect::<Vec<_>>();
383 let stream = futures::stream::iter(filter_list.into_iter().map(move |filter| async move {
384 match self.trace_filter(filter.clone()).await {
385 Ok(result) => Ok((result, filter)),
386 Err(err) => Err((err, filter)),
387 }
388 }))
389 .buffered(10);
390 TraceFilterStream { stream: Box::pin(stream) }
391 }
392
393 fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_> {
394 let stream = futures::stream::once(async move {
395 match self
396 .trace_call(
397 request.call.clone(),
398 request.trace_types.clone(),
399 request.block_id,
400 request.state_overrides.clone(),
401 request.block_overrides.clone(),
402 )
403 .await
404 {
405 Ok(result) => Ok(result),
406 Err(err) => Err((err, request)),
407 }
408 });
409 TraceCallStream { stream: Box::pin(stream) }
410 }
411}
412
413#[must_use = "streams do nothing unless polled"]
415pub struct TraceBlockStream<'a> {
416 stream: Pin<Box<dyn Stream<Item = TraceBlockResult> + 'a>>,
417}
418
419impl TraceBlockStream<'_> {
420 pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
422 loop {
423 match self.next().await? {
424 Ok(_) => {}
425 Err(err) => return Some(err),
426 }
427 }
428 }
429}
430
431impl Stream for TraceBlockStream<'_> {
432 type Item = TraceBlockResult;
433
434 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
435 self.stream.as_mut().poll_next(cx)
436 }
437}
438
439impl std::fmt::Debug for TraceBlockStream<'_> {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 f.debug_struct("TraceBlockStream").finish_non_exhaustive()
442 }
443}
444
445#[must_use = "streams do nothing unless polled"]
447pub struct TraceBlockOpcodeGasStream<'a> {
448 stream: Pin<Box<dyn Stream<Item = TraceBlockOpCodeGasResult> + 'a>>,
449}
450
451impl TraceBlockOpcodeGasStream<'_> {
452 pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
454 loop {
455 match self.next().await? {
456 Ok(_) => {}
457 Err(err) => return Some(err),
458 }
459 }
460 }
461}
462
463impl Stream for TraceBlockOpcodeGasStream<'_> {
464 type Item = TraceBlockOpCodeGasResult;
465
466 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
467 self.stream.as_mut().poll_next(cx)
468 }
469}
470
471impl std::fmt::Debug for TraceBlockOpcodeGasStream<'_> {
472 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473 f.debug_struct("TraceBlockOpcodeGasStream").finish_non_exhaustive()
474 }
475}
476
477#[derive(Debug)]
485pub struct RpcComparer<C1, C2>
486where
487 C1: TraceApiExt,
488 C2: TraceApiExt,
489{
490 client1: C1,
491 client2: C2,
492}
493impl<C1, C2> RpcComparer<C1, C2>
494where
495 C1: TraceApiExt,
496 C2: TraceApiExt,
497{
498 pub const fn new(client1: C1, client2: C2) -> Self {
508 Self { client1, client2 }
509 }
510
511 pub async fn compare_trace_block_responses(&self, block_ids: Vec<BlockId>) {
517 let stream1 = self.client1.trace_block_buffered(block_ids.clone(), 2);
518 let stream2 = self.client2.trace_block_buffered(block_ids, 2);
519
520 let mut zipped_streams = stream1.zip(stream2);
521
522 while let Some((result1, result2)) = zipped_streams.next().await {
523 match (result1, result2) {
524 (Ok((ref traces1_data, ref block1)), Ok((ref traces2_data, ref block2))) => {
525 similar_asserts::assert_eq!(
526 traces1_data,
527 traces2_data,
528 "Mismatch in traces for block: {:?}",
529 block1
530 );
531 assert_eq!(block1, block2, "Mismatch in block ids.");
532 }
533 (Err((ref err1, ref block1)), Err((ref err2, ref block2))) => {
534 assert_eq!(
535 format!("{err1:?}"),
536 format!("{err2:?}"),
537 "Different errors for block: {block1:?}"
538 );
539 assert_eq!(block1, block2, "Mismatch in block ids.");
540 }
541 _ => panic!("One client returned Ok while the other returned Err."),
542 }
543 }
544 }
545
546 pub async fn compare_replay_transaction_responses(
548 &self,
549 transaction_hashes: Vec<TxHash>,
550 trace_types: HashSet<TraceType>,
551 ) {
552 let stream1 =
553 self.client1.replay_transactions(transaction_hashes.clone(), trace_types.clone());
554 let stream2 = self.client2.replay_transactions(transaction_hashes, trace_types);
555
556 let mut zipped_streams = stream1.zip(stream2);
557
558 while let Some((result1, result2)) = zipped_streams.next().await {
559 match (result1, result2) {
560 (Ok((ref trace1_data, ref tx_hash1)), Ok((ref trace2_data, ref tx_hash2))) => {
561 similar_asserts::assert_eq!(
562 trace1_data,
563 trace2_data,
564 "Mismatch in trace results for transaction: {tx_hash1:?}",
565 );
566 assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
567 }
568 (Err((ref err1, ref tx_hash1)), Err((ref err2, ref tx_hash2))) => {
569 assert_eq!(
570 format!("{err1:?}"),
571 format!("{err2:?}"),
572 "Different errors for transaction: {tx_hash1:?}",
573 );
574 assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
575 }
576 _ => panic!("One client returned Ok while the other returned Err."),
577 }
578 }
579 }
580}
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use alloy_eips::BlockNumberOrTag;
585 use alloy_rpc_types_trace::filter::TraceFilterMode;
586 use futures::future::join;
587 use jsonrpsee::http_client::HttpClientBuilder;
588
589 const fn assert_is_stream<St: Stream>(_: &St) {}
590
591 #[tokio::test]
592 async fn can_create_block_stream() {
593 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
594 let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()];
595 let stream = client.trace_block_buffered(block, 2);
596 assert_is_stream(&stream);
597 }
598
599 #[tokio::test]
600 #[ignore]
601 async fn can_create_replay_transaction_stream() {
602 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
603
604 let transactions = vec![
606 "0x4e08fe36db723a338e852f89f613e606b0c9a17e649b18b01251f86236a2cef3".parse().unwrap(),
607 "0xea2817f1aeeb587b82f4ab87a6dbd3560fc35ed28de1be280cb40b2a24ab48bb".parse().unwrap(),
608 ];
609
610 let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
611
612 let mut stream = client.replay_transactions(transactions, trace_types);
613 let mut successes = 0;
614 let mut failures = 0;
615
616 assert_is_stream(&stream);
617
618 while let Some(result) = stream.next().await {
619 match result {
620 Ok((trace_result, tx_hash)) => {
621 println!("Success for tx_hash {tx_hash:?}: {trace_result:?}");
622 successes += 1;
623 }
624 Err((error, tx_hash)) => {
625 println!("Error for tx_hash {tx_hash:?}: {error:?}");
626 failures += 1;
627 }
628 }
629 }
630
631 println!("Total successes: {successes}");
632 println!("Total failures: {failures}");
633 }
634
635 #[tokio::test]
636 #[ignore]
637 async fn can_create_trace_call_many_stream() {
638 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
639
640 let call_request_1 = TransactionRequest::default();
641 let call_request_2 = TransactionRequest::default();
642 let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
643 let calls = vec![(call_request_1, trace_types.clone()), (call_request_2, trace_types)];
644
645 let mut stream = client.trace_call_many_stream(calls, None);
646
647 assert_is_stream(&stream);
648
649 while let Some(result) = stream.next().await {
650 match result {
651 Ok(trace_result) => {
652 println!("Success: {trace_result:?}");
653 }
654 Err(error) => {
655 println!("Error: {error:?}");
656 }
657 }
658 }
659 }
660 #[tokio::test]
661 #[ignore]
662 async fn can_create_trace_get_stream() {
663 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
664
665 let tx_hash: B256 = "".parse().unwrap();
666
667 let indices: Vec<Index> = vec![Index::from(0)];
668
669 let mut stream = client.trace_get_stream(tx_hash, indices);
670
671 while let Some(result) = stream.next().await {
672 match result {
673 Ok(trace) => {
674 println!("Received trace: {trace:?}");
675 }
676 Err(e) => {
677 println!("Error fetching trace: {e:?}");
678 }
679 }
680 }
681 }
682
683 #[tokio::test]
684 #[ignore]
685 async fn can_create_trace_filter() {
686 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
687
688 let filter = TraceFilter {
689 from_block: None,
690 to_block: None,
691 from_address: Vec::new(),
692 to_address: Vec::new(),
693 mode: TraceFilterMode::Union,
694 after: None,
695 count: None,
696 };
697
698 let filters = vec![filter];
699 let mut stream = client.trace_filter_stream(filters);
700
701 while let Some(result) = stream.next().await {
702 match result {
703 Ok(trace) => {
704 println!("Received trace: {trace:?}");
705 }
706 Err(e) => {
707 println!("Error fetching trace: {e:?}");
708 }
709 }
710 }
711 }
712
713 #[tokio::test]
714 #[ignore]
715 async fn can_create_trace_call_stream() {
716 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
717
718 let trace_call_request = TraceCallRequest::default();
719
720 let mut stream = client.trace_call_stream(trace_call_request);
721 let mut successes = 0;
722 let mut failures = 0;
723
724 assert_is_stream(&stream);
725
726 while let Some(result) = stream.next().await {
727 match result {
728 Ok(trace_result) => {
729 println!("Success: {trace_result:?}");
730 successes += 1;
731 }
732 Err((error, request)) => {
733 println!("Error for request {request:?}: {error:?}");
734 failures += 1;
735 }
736 }
737 }
738
739 println!("Total successes: {successes}");
740 println!("Total failures: {failures}");
741 }
742
743 #[tokio::test]
744 #[ignore]
745 async fn block_opcode_gas_stream() {
746 let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
747 let block = vec![BlockNumberOrTag::Latest];
748 let mut stream = client.trace_block_opcode_gas_unordered(block, 2);
749 assert_is_stream(&stream);
750 let _opcodes = stream.next().await.unwrap();
751 }
752
753 #[tokio::test(flavor = "multi_thread")]
754 #[ignore]
755 async fn compare_block_stream() {
756 let client_a = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
757 let client_b = HttpClientBuilder::default().build("http://localhost:8544").unwrap();
758 let blocks = 0u64..=1681464;
759 let mut stream_a = client_a.trace_block_buffered(blocks.clone(), 2);
760 let mut stream_b = client_b.trace_block_buffered(blocks, 2);
761
762 let mut count = 0;
763 loop {
764 let (res_a, res_b) = join(stream_a.next(), stream_b.next()).await;
765
766 if res_a.is_none() && res_b.is_none() {
767 break;
768 }
769
770 match (res_a, res_b) {
771 (Some(Ok(res_a)), Some(Ok(res_b))) => {
772 if res_a != res_b {
773 println!("Received different trace results: {res_a:?}, res_b: {res_b:?}");
774 }
775 }
776 (res_a, res_b) => {
777 println!("Received different responses: {res_a:?}, res_b: {res_b:?}");
778 }
779 }
780
781 if count % 1000 == 0 {
782 println!("Blocks traced: {count}");
783 }
784
785 count += 1;
786 }
787 println!("Total blocks traced: {count}");
788 }
789}