use alloy_eips::BlockId;
use alloy_primitives::{map::HashSet, Bytes, TxHash, B256};
use alloy_rpc_types_eth::{transaction::TransactionRequest, Index};
use alloy_rpc_types_trace::{
filter::TraceFilter,
opcode::BlockOpcodeGas,
parity::{LocalizedTransactionTrace, TraceResults, TraceType},
tracerequest::TraceCallRequest,
};
use futures::{Stream, StreamExt};
use jsonrpsee::core::client::Error as RpcError;
use reth_rpc_api::clients::TraceApiClient;
use std::{
pin::Pin,
task::{Context, Poll},
};
type RawTransactionTraceResult<'a> =
Pin<Box<dyn Stream<Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>> + 'a>>;
pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
pub type TraceBlockOpCodeGasResult = Result<(BlockOpcodeGas, BlockId), (RpcError, BlockId)>;
pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
pub type CallManyTraceResult = Result<
(Vec<TraceResults>, Vec<(TransactionRequest, HashSet<TraceType>)>),
(RpcError, Vec<(TransactionRequest, HashSet<TraceType>)>),
>;
pub type TraceGetResult =
Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
pub type TraceFilterResult =
Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;
pub type TraceCallResult = Result<TraceResults, (RpcError, TraceCallRequest)>;
pub trait TraceApiExt {
type Provider;
fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
fn trace_block_opcode_gas_unordered<I, B>(
&self,
params: I,
n: usize,
) -> TraceBlockOpcodeGasStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
fn replay_transactions<I>(
&self,
tx_hashes: I,
trace_types: HashSet<TraceType>,
) -> ReplayTransactionStream<'_>
where
I: IntoIterator<Item = TxHash>;
fn trace_raw_transaction_stream(
&self,
data: Bytes,
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> RawTransactionTraceStream<'_>;
fn trace_call_many_stream<I>(
&self,
calls: I,
block_id: Option<BlockId>,
) -> CallManyTraceStream<'_>
where
I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>;
fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
where
I: IntoIterator<Item = Index>;
fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
where
I: IntoIterator<Item = TraceFilter>;
fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_>;
}
#[must_use = "streams do nothing unless polled"]
pub struct TraceCallStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceCallResult> + 'a>>,
}
impl Stream for TraceCallStream<'_> {
type Item = TraceCallResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for TraceCallStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceCallStream").finish()
}
}
#[must_use = "streams do nothing unless polled"]
pub struct TraceFilterStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceFilterResult> + 'a>>,
}
impl Stream for TraceFilterStream<'_> {
type Item = TraceFilterResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for TraceFilterStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceFilterStream").finish_non_exhaustive()
}
}
#[must_use = "streams do nothing unless polled"]
pub struct TraceGetStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceGetResult> + 'a>>,
}
impl Stream for TraceGetStream<'_> {
type Item = TraceGetResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for TraceGetStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceGetStream").finish_non_exhaustive()
}
}
#[must_use = "streams do nothing unless polled"]
pub struct CallManyTraceStream<'a> {
stream: Pin<Box<dyn Stream<Item = CallManyTraceResult> + 'a>>,
}
impl Stream for CallManyTraceStream<'_> {
type Item = CallManyTraceResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for CallManyTraceStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CallManyTraceStream").finish()
}
}
#[must_use = "streams do nothing unless polled"]
pub struct RawTransactionTraceStream<'a> {
stream: RawTransactionTraceResult<'a>,
}
impl Stream for RawTransactionTraceStream<'_> {
type Item = Result<(TraceResults, Bytes), (RpcError, Bytes)>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for RawTransactionTraceStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RawTransactionTraceStream").finish()
}
}
#[must_use = "streams do nothing unless polled"]
pub struct ReplayTransactionStream<'a> {
stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
}
impl Stream for ReplayTransactionStream<'_> {
type Item = ReplayTransactionResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for ReplayTransactionStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReplayTransactionStream").finish()
}
}
impl<T: TraceApiClient + Sync> TraceApiExt for T {
type Provider = T;
fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>,
{
let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
match self.trace_block(block).await {
Ok(result) => Ok((result.unwrap_or_default(), block)),
Err(err) => Err((err, block)),
}
}))
.buffered(n);
TraceBlockStream { stream: Box::pin(stream) }
}
fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>,
{
let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
match self.trace_block(block).await {
Ok(result) => Ok((result.unwrap_or_default(), block)),
Err(err) => Err((err, block)),
}
}))
.buffer_unordered(n);
TraceBlockStream { stream: Box::pin(stream) }
}
fn trace_block_opcode_gas_unordered<I, B>(
&self,
params: I,
n: usize,
) -> TraceBlockOpcodeGasStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>,
{
let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
match self.trace_block_opcode_gas(block).await {
Ok(result) => Ok((result.unwrap(), block)),
Err(err) => Err((err, block)),
}
}))
.buffered(n);
TraceBlockOpcodeGasStream { stream: Box::pin(stream) }
}
fn replay_transactions<I>(
&self,
tx_hashes: I,
trace_types: HashSet<TraceType>,
) -> ReplayTransactionStream<'_>
where
I: IntoIterator<Item = TxHash>,
{
let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
let trace_types_clone = trace_types.clone(); async move {
match self.replay_transaction(hash, trace_types_clone).await {
Ok(result) => Ok((result, hash)),
Err(err) => Err((err, hash)),
}
}
}))
.buffered(10);
ReplayTransactionStream { stream: Box::pin(stream) }
}
fn trace_raw_transaction_stream(
&self,
data: Bytes,
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> RawTransactionTraceStream<'_> {
let stream = futures::stream::once(async move {
match self.trace_raw_transaction(data.clone(), trace_types, block_id).await {
Ok(result) => Ok((result, data)),
Err(err) => Err((err, data)),
}
});
RawTransactionTraceStream { stream: Box::pin(stream) }
}
fn trace_call_many_stream<I>(
&self,
calls: I,
block_id: Option<BlockId>,
) -> CallManyTraceStream<'_>
where
I: IntoIterator<Item = (TransactionRequest, HashSet<TraceType>)>,
{
let call_set = calls.into_iter().collect::<Vec<_>>();
let stream = futures::stream::once(async move {
match self.trace_call_many(call_set.clone(), block_id).await {
Ok(results) => Ok((results, call_set)),
Err(err) => Err((err, call_set)),
}
});
CallManyTraceStream { stream: Box::pin(stream) }
}
fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
where
I: IntoIterator<Item = Index>,
{
let index_list = indices.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(index_list.into_iter().map(move |index| async move {
match self.trace_get(hash, vec![index]).await {
Ok(result) => Ok((result, hash, vec![index])),
Err(err) => Err((err, hash, vec![index])),
}
}))
.buffered(10);
TraceGetStream { stream: Box::pin(stream) }
}
fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
where
I: IntoIterator<Item = TraceFilter>,
{
let filter_list = filters.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(filter_list.into_iter().map(move |filter| async move {
match self.trace_filter(filter.clone()).await {
Ok(result) => Ok((result, filter)),
Err(err) => Err((err, filter)),
}
}))
.buffered(10);
TraceFilterStream { stream: Box::pin(stream) }
}
fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_> {
let stream = futures::stream::once(async move {
match self
.trace_call(
request.call.clone(),
request.trace_types.clone(),
request.block_id,
request.state_overrides.clone(),
request.block_overrides.clone(),
)
.await
{
Ok(result) => Ok(result),
Err(err) => Err((err, request)),
}
});
TraceCallStream { stream: Box::pin(stream) }
}
}
#[must_use = "streams do nothing unless polled"]
pub struct TraceBlockStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceBlockResult> + 'a>>,
}
impl TraceBlockStream<'_> {
pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
loop {
match self.next().await? {
Ok(_) => continue,
Err(err) => return Some(err),
}
}
}
}
impl Stream for TraceBlockStream<'_> {
type Item = TraceBlockResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for TraceBlockStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceBlockStream").finish_non_exhaustive()
}
}
#[must_use = "streams do nothing unless polled"]
pub struct TraceBlockOpcodeGasStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceBlockOpCodeGasResult> + 'a>>,
}
impl TraceBlockOpcodeGasStream<'_> {
pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
loop {
match self.next().await? {
Ok(_) => continue,
Err(err) => return Some(err),
}
}
}
}
impl Stream for TraceBlockOpcodeGasStream<'_> {
type Item = TraceBlockOpCodeGasResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl std::fmt::Debug for TraceBlockOpcodeGasStream<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceBlockOpcodeGasStream").finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct RpcComparer<C1, C2>
where
C1: TraceApiExt,
C2: TraceApiExt,
{
client1: C1,
client2: C2,
}
impl<C1, C2> RpcComparer<C1, C2>
where
C1: TraceApiExt,
C2: TraceApiExt,
{
pub const fn new(client1: C1, client2: C2) -> Self {
Self { client1, client2 }
}
pub async fn compare_trace_block_responses(&self, block_ids: Vec<BlockId>) {
let stream1 = self.client1.trace_block_buffered(block_ids.clone(), 2);
let stream2 = self.client2.trace_block_buffered(block_ids, 2);
let mut zipped_streams = stream1.zip(stream2);
while let Some((result1, result2)) = zipped_streams.next().await {
match (result1, result2) {
(Ok((ref traces1_data, ref block1)), Ok((ref traces2_data, ref block2))) => {
similar_asserts::assert_eq!(
traces1_data,
traces2_data,
"Mismatch in traces for block: {:?}",
block1
);
assert_eq!(block1, block2, "Mismatch in block ids.");
}
(Err((ref err1, ref block1)), Err((ref err2, ref block2))) => {
assert_eq!(
format!("{err1:?}"),
format!("{err2:?}"),
"Different errors for block: {block1:?}"
);
assert_eq!(block1, block2, "Mismatch in block ids.");
}
_ => panic!("One client returned Ok while the other returned Err."),
}
}
}
pub async fn compare_replay_transaction_responses(
&self,
transaction_hashes: Vec<TxHash>,
trace_types: HashSet<TraceType>,
) {
let stream1 =
self.client1.replay_transactions(transaction_hashes.clone(), trace_types.clone());
let stream2 = self.client2.replay_transactions(transaction_hashes, trace_types);
let mut zipped_streams = stream1.zip(stream2);
while let Some((result1, result2)) = zipped_streams.next().await {
match (result1, result2) {
(Ok((ref trace1_data, ref tx_hash1)), Ok((ref trace2_data, ref tx_hash2))) => {
similar_asserts::assert_eq!(
trace1_data,
trace2_data,
"Mismatch in trace results for transaction: {tx_hash1:?}",
);
assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
}
(Err((ref err1, ref tx_hash1)), Err((ref err2, ref tx_hash2))) => {
assert_eq!(
format!("{err1:?}"),
format!("{err2:?}"),
"Different errors for transaction: {tx_hash1:?}",
);
assert_eq!(tx_hash1, tx_hash2, "Mismatch in transaction hashes.");
}
_ => panic!("One client returned Ok while the other returned Err."),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_eips::BlockNumberOrTag;
use alloy_rpc_types_trace::filter::TraceFilterMode;
use jsonrpsee::http_client::HttpClientBuilder;
const fn assert_is_stream<St: Stream>(_: &St) {}
#[tokio::test]
async fn can_create_block_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()];
let stream = client.trace_block_buffered(block, 2);
assert_is_stream(&stream);
}
#[tokio::test]
#[ignore]
async fn can_create_replay_transaction_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let transactions = vec![
"0x4e08fe36db723a338e852f89f613e606b0c9a17e649b18b01251f86236a2cef3".parse().unwrap(),
"0xea2817f1aeeb587b82f4ab87a6dbd3560fc35ed28de1be280cb40b2a24ab48bb".parse().unwrap(),
];
let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
let mut stream = client.replay_transactions(transactions, trace_types);
let mut successes = 0;
let mut failures = 0;
assert_is_stream(&stream);
while let Some(result) = stream.next().await {
match result {
Ok((trace_result, tx_hash)) => {
println!("Success for tx_hash {tx_hash:?}: {trace_result:?}");
successes += 1;
}
Err((error, tx_hash)) => {
println!("Error for tx_hash {tx_hash:?}: {error:?}");
failures += 1;
}
}
}
println!("Total successes: {successes}");
println!("Total failures: {failures}");
}
#[tokio::test]
#[ignore]
async fn can_create_trace_call_many_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let call_request_1 = TransactionRequest::default();
let call_request_2 = TransactionRequest::default();
let trace_types = HashSet::from_iter([TraceType::StateDiff, TraceType::VmTrace]);
let calls = vec![(call_request_1, trace_types.clone()), (call_request_2, trace_types)];
let mut stream = client.trace_call_many_stream(calls, None);
assert_is_stream(&stream);
while let Some(result) = stream.next().await {
match result {
Ok(trace_result) => {
println!("Success: {trace_result:?}");
}
Err(error) => {
println!("Error: {error:?}");
}
}
}
}
#[tokio::test]
#[ignore]
async fn can_create_trace_get_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let tx_hash: B256 = "".parse().unwrap();
let indices: Vec<Index> = vec![Index::from(0)];
let mut stream = client.trace_get_stream(tx_hash, indices);
while let Some(result) = stream.next().await {
match result {
Ok(trace) => {
println!("Received trace: {trace:?}");
}
Err(e) => {
println!("Error fetching trace: {e:?}");
}
}
}
}
#[tokio::test]
#[ignore]
async fn can_create_trace_filter() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let filter = TraceFilter {
from_block: None,
to_block: None,
from_address: Vec::new(),
to_address: Vec::new(),
mode: TraceFilterMode::Union,
after: None,
count: None,
};
let filters = vec![filter];
let mut stream = client.trace_filter_stream(filters);
while let Some(result) = stream.next().await {
match result {
Ok(trace) => {
println!("Received trace: {trace:?}");
}
Err(e) => {
println!("Error fetching trace: {e:?}");
}
}
}
}
#[tokio::test]
#[ignore]
async fn can_create_trace_call_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let trace_call_request = TraceCallRequest::default();
let mut stream = client.trace_call_stream(trace_call_request);
let mut successes = 0;
let mut failures = 0;
assert_is_stream(&stream);
while let Some(result) = stream.next().await {
match result {
Ok(trace_result) => {
println!("Success: {trace_result:?}");
successes += 1;
}
Err((error, request)) => {
println!("Error for request {request:?}: {error:?}");
failures += 1;
}
}
}
println!("Total successes: {successes}");
println!("Total failures: {failures}");
}
#[tokio::test]
#[ignore]
async fn block_opcode_gas_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let block = vec![BlockNumberOrTag::Latest];
let mut stream = client.trace_block_opcode_gas_unordered(block, 2);
assert_is_stream(&stream);
let _opcodes = stream.next().await.unwrap();
}
}