reth_bench/bench/
new_payload_only.rs1use crate::{
4 bench::{
5 context::BenchContext,
6 output::{
7 NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
8 NEW_PAYLOAD_OUTPUT_SUFFIX,
9 },
10 },
11 valid_payload::{block_to_new_payload, call_new_payload},
12};
13use alloy_provider::Provider;
14use clap::Parser;
15use csv::Writer;
16use eyre::{Context, OptionExt};
17use reth_cli_runner::CliContext;
18use reth_node_core::args::BenchmarkArgs;
19use std::time::{Duration, Instant};
20use tracing::{debug, info};
21
22#[derive(Debug, Parser)]
24pub struct Command {
25 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
27 rpc_url: String,
28
29 #[arg(
32 long = "rpc-block-buffer-size",
33 value_name = "RPC_BLOCK_BUFFER_SIZE",
34 default_value = "20",
35 verbatim_doc_comment
36 )]
37 rpc_block_buffer_size: usize,
38
39 #[command(flatten)]
40 benchmark: BenchmarkArgs,
41}
42
43impl Command {
44 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
46 let BenchContext {
47 benchmark_mode,
48 block_provider,
49 auth_provider,
50 mut next_block,
51 is_optimism,
52 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
53
54 let buffer_size = self.rpc_block_buffer_size;
55
56 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
58 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
59
60 tokio::task::spawn(async move {
61 while benchmark_mode.contains(next_block) {
62 let block_res = block_provider
63 .get_block_by_number(next_block.into())
64 .full()
65 .await
66 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
67 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
68 Ok(block) => block,
69 Err(e) => {
70 tracing::error!("Failed to fetch block {next_block}: {e}");
71 let _ = error_sender.send(e);
72 break;
73 }
74 };
75 let header = block.header.clone();
76
77 let (version, params) = match block_to_new_payload(block, is_optimism) {
78 Ok(result) => result,
79 Err(e) => {
80 tracing::error!("Failed to convert block to new payload: {e}");
81 let _ = error_sender.send(e);
82 break;
83 }
84 };
85
86 next_block += 1;
87 if let Err(e) = sender.send((header, version, params)).await {
88 tracing::error!("Failed to send block data: {e}");
89 break;
90 }
91 }
92 });
93
94 let mut results = Vec::new();
96 let total_benchmark_duration = Instant::now();
97 let mut total_wait_time = Duration::ZERO;
98
99 while let Some((header, version, params)) = {
100 let wait_start = Instant::now();
101 let result = receiver.recv().await;
102 total_wait_time += wait_start.elapsed();
103 result
104 } {
105 let gas_used = header.gas_used;
107
108 let block_number = header.number;
109
110 debug!(
111 target: "reth-bench",
112 number=?header.number,
113 "Sending payload to engine",
114 );
115
116 let start = Instant::now();
117 call_new_payload(&auth_provider, version, params).await?;
118
119 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
120 info!(%new_payload_result);
121
122 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
125
126 let row = TotalGasRow { block_number, gas_used, time: current_duration };
128 results.push((row, new_payload_result));
129 }
130
131 if let Ok(error) = error_receiver.try_recv() {
133 return Err(error);
134 }
135
136 let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
137 results.into_iter().unzip();
138
139 if let Some(path) = self.benchmark.output {
141 let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
143 info!("Writing newPayload call latency output to file: {:?}", output_path);
144 let mut writer = Writer::from_path(output_path)?;
145 for result in new_payload_results {
146 writer.serialize(result)?;
147 }
148 writer.flush()?;
149
150 let output_path = path.join(GAS_OUTPUT_SUFFIX);
152 info!("Writing total gas output to file: {:?}", output_path);
153 let mut writer = Writer::from_path(output_path)?;
154 for row in &gas_output_results {
155 writer.serialize(row)?;
156 }
157 writer.flush()?;
158
159 info!("Finished writing benchmark output files to {:?}.", path);
160 }
161
162 let gas_output = TotalGasOutput::new(gas_output_results)?;
164 info!(
165 total_duration=?gas_output.total_duration,
166 total_gas_used=?gas_output.total_gas_used,
167 blocks_processed=?gas_output.blocks_processed,
168 "Total Ggas/s: {:.4}",
169 gas_output.total_gigagas_per_second()
170 );
171
172 Ok(())
173 }
174}