1use crate::{
5 bench::{
6 context::BenchContext,
7 output::{
8 CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9 GAS_OUTPUT_SUFFIX,
10 },
11 },
12 valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
13};
14use alloy_provider::Provider;
15use alloy_rpc_types_engine::ForkchoiceState;
16use clap::Parser;
17use csv::Writer;
18use eyre::{Context, OptionExt};
19use humantime::parse_duration;
20use reth_cli_runner::CliContext;
21use reth_node_core::args::BenchmarkArgs;
22use std::time::{Duration, Instant};
23use tracing::{debug, info};
24
25#[derive(Debug, Parser)]
27pub struct Command {
28 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
30 rpc_url: String,
31
32 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, default_value = "250ms", verbatim_doc_comment)]
34 wait_time: Duration,
35
36 #[arg(
39 long = "rpc-block-buffer-size",
40 value_name = "RPC_BLOCK_BUFFER_SIZE",
41 default_value = "20",
42 verbatim_doc_comment
43 )]
44 rpc_block_buffer_size: usize,
45
46 #[command(flatten)]
47 benchmark: BenchmarkArgs,
48}
49
50impl Command {
51 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
53 let BenchContext {
54 benchmark_mode,
55 block_provider,
56 auth_provider,
57 mut next_block,
58 is_optimism,
59 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
60
61 let buffer_size = self.rpc_block_buffer_size;
62
63 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
65 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
66
67 tokio::task::spawn(async move {
68 while benchmark_mode.contains(next_block) {
69 let block_res = block_provider
70 .get_block_by_number(next_block.into())
71 .full()
72 .await
73 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
74 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
75 Ok(block) => block,
76 Err(e) => {
77 tracing::error!("Failed to fetch block {next_block}: {e}");
78 let _ = error_sender.send(e);
79 break;
80 }
81 };
82 let header = block.header.clone();
83
84 let (version, params) = match block_to_new_payload(block, is_optimism) {
85 Ok(result) => result,
86 Err(e) => {
87 tracing::error!("Failed to convert block to new payload: {e}");
88 let _ = error_sender.send(e);
89 break;
90 }
91 };
92 let head_block_hash = header.hash;
93 let safe_block_hash =
94 block_provider.get_block_by_number(header.number.saturating_sub(32).into());
95
96 let finalized_block_hash =
97 block_provider.get_block_by_number(header.number.saturating_sub(64).into());
98
99 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
100
101 let safe_block_hash = match safe {
102 Ok(Some(block)) => block.header.hash,
103 Ok(None) | Err(_) => head_block_hash,
104 };
105
106 let finalized_block_hash = match finalized {
107 Ok(Some(block)) => block.header.hash,
108 Ok(None) | Err(_) => head_block_hash,
109 };
110
111 next_block += 1;
112 if let Err(e) = sender
113 .send((
114 header,
115 version,
116 params,
117 head_block_hash,
118 safe_block_hash,
119 finalized_block_hash,
120 ))
121 .await
122 {
123 tracing::error!("Failed to send block data: {e}");
124 break;
125 }
126 }
127 });
128
129 let mut results = Vec::new();
131 let total_benchmark_duration = Instant::now();
132 let mut total_wait_time = Duration::ZERO;
133
134 while let Some((header, version, params, head, safe, finalized)) = {
135 let wait_start = Instant::now();
136 let result = receiver.recv().await;
137 total_wait_time += wait_start.elapsed();
138 result
139 } {
140 let gas_used = header.gas_used;
142 let block_number = header.number;
143
144 debug!(target: "reth-bench", ?block_number, "Sending payload",);
145
146 let forkchoice_state = ForkchoiceState {
148 head_block_hash: head,
149 safe_block_hash: safe,
150 finalized_block_hash: finalized,
151 };
152
153 let start = Instant::now();
154 call_new_payload(&auth_provider, version, params).await?;
155
156 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
157
158 call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
159
160 let total_latency = start.elapsed();
162 let fcu_latency = total_latency - new_payload_result.latency;
163 let combined_result =
164 CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
165
166 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
169
170 info!(%combined_result);
172
173 tokio::time::sleep(self.wait_time).await;
175
176 let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
178 results.push((gas_row, combined_result));
179 }
180
181 if let Ok(error) = error_receiver.try_recv() {
183 return Err(error);
184 }
185
186 let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
187 results.into_iter().unzip();
188
189 if let Some(path) = self.benchmark.output {
191 let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
193 info!("Writing engine api call latency output to file: {:?}", output_path);
194 let mut writer = Writer::from_path(output_path)?;
195 for result in combined_results {
196 writer.serialize(result)?;
197 }
198 writer.flush()?;
199
200 let output_path = path.join(GAS_OUTPUT_SUFFIX);
202 info!("Writing total gas output to file: {:?}", output_path);
203 let mut writer = Writer::from_path(output_path)?;
204 for row in &gas_output_results {
205 writer.serialize(row)?;
206 }
207 writer.flush()?;
208
209 info!("Finished writing benchmark output files to {:?}.", path);
210 }
211
212 let gas_output = TotalGasOutput::new(gas_output_results)?;
214 info!(
215 total_duration=?gas_output.total_duration,
216 total_gas_used=?gas_output.total_gas_used,
217 blocks_processed=?gas_output.blocks_processed,
218 "Total Ggas/s: {:.4}",
219 gas_output.total_gigagas_per_second()
220 );
221
222 Ok(())
223 }
224}