reth_bench/bench/
new_payload_only.rs
1use crate::{
4 bench::{
5 context::BenchContext,
6 output::{
7 NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
8 NEW_PAYLOAD_OUTPUT_SUFFIX,
9 },
10 },
11 valid_payload::call_new_payload,
12};
13use alloy_provider::Provider;
14use alloy_rpc_types_engine::ExecutionPayload;
15use clap::Parser;
16use csv::Writer;
17use reth_cli_runner::CliContext;
18use reth_node_core::args::BenchmarkArgs;
19use std::time::{Duration, Instant};
20use tracing::{debug, info};
21
22#[derive(Debug, Parser)]
24pub struct Command {
25 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
27 rpc_url: String,
28
29 #[command(flatten)]
30 benchmark: BenchmarkArgs,
31}
32
33impl Command {
34 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
36 let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
37 BenchContext::new(&self.benchmark, self.rpc_url).await?;
38
39 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
40 tokio::task::spawn(async move {
41 while benchmark_mode.contains(next_block) {
42 let block_res = block_provider.get_block_by_number(next_block.into()).full().await;
43 let block = block_res.unwrap().unwrap();
44 let block = block
45 .into_inner()
46 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
47 .try_map_transactions(|tx| {
48 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
49 })
50 .unwrap()
51 .into_consensus();
52
53 let blob_versioned_hashes =
54 block.body.blob_versioned_hashes_iter().copied().collect::<Vec<_>>();
55 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
56
57 next_block += 1;
58 sender.send((block.header, blob_versioned_hashes, payload, sidecar)).await.unwrap();
59 }
60 });
61
62 let mut results = Vec::new();
64 let total_benchmark_duration = Instant::now();
65 let mut total_wait_time = Duration::ZERO;
66
67 while let Some((header, versioned_hashes, payload, sidecar)) = {
68 let wait_start = Instant::now();
69 let result = receiver.recv().await;
70 total_wait_time += wait_start.elapsed();
71 result
72 } {
73 let gas_used = header.gas_used;
75
76 let block_number = payload.block_number();
77
78 debug!(
79 target: "reth-bench",
80 number=?header.number,
81 "Sending payload to engine",
82 );
83
84 let start = Instant::now();
85 call_new_payload(
86 &auth_provider,
87 payload,
88 sidecar,
89 header.parent_beacon_block_root,
90 versioned_hashes,
91 )
92 .await?;
93
94 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
95 info!(%new_payload_result);
96
97 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
100
101 let row = TotalGasRow { block_number, gas_used, time: current_duration };
103 results.push((row, new_payload_result));
104 }
105
106 let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
107 results.into_iter().unzip();
108
109 if let Some(path) = self.benchmark.output {
111 let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
113 info!("Writing newPayload call latency output to file: {:?}", output_path);
114 let mut writer = Writer::from_path(output_path)?;
115 for result in new_payload_results {
116 writer.serialize(result)?;
117 }
118 writer.flush()?;
119
120 let output_path = path.join(GAS_OUTPUT_SUFFIX);
122 info!("Writing total gas output to file: {:?}", output_path);
123 let mut writer = Writer::from_path(output_path)?;
124 for row in &gas_output_results {
125 writer.serialize(row)?;
126 }
127 writer.flush()?;
128
129 info!("Finished writing benchmark output files to {:?}.", path);
130 }
131
132 let gas_output = TotalGasOutput::new(gas_output_results);
134 info!(
135 total_duration=?gas_output.total_duration,
136 total_gas_used=?gas_output.total_gas_used,
137 blocks_processed=?gas_output.blocks_processed,
138 "Total Ggas/s: {:.4}",
139 gas_output.total_gigagas_per_second()
140 );
141
142 Ok(())
143 }
144}