1use crate::{
13 bench::{
14 context::BenchContext,
15 helpers::parse_duration,
16 output::{
17 write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
18 },
19 persistence_waiter::{
20 derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
21 },
22 },
23 valid_payload::{block_to_new_payload, call_forkchoice_updated, call_new_payload},
24};
25use alloy_provider::Provider;
26use alloy_rpc_types_engine::ForkchoiceState;
27use clap::Parser;
28use eyre::{Context, OptionExt};
29use reth_cli_runner::CliContext;
30use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
31use reth_node_core::args::BenchmarkArgs;
32use std::time::{Duration, Instant};
33use tracing::{debug, info};
34
35#[derive(Debug, Parser)]
37pub struct Command {
38 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
40 rpc_url: String,
41
42 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
47 wait_time: Option<Duration>,
48
49 #[arg(long, default_value = "false", verbatim_doc_comment)]
57 wait_for_persistence: bool,
58
59 #[arg(
65 long = "persistence-threshold",
66 value_name = "PERSISTENCE_THRESHOLD",
67 default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
68 verbatim_doc_comment
69 )]
70 persistence_threshold: u64,
71
72 #[arg(
77 long = "persistence-timeout",
78 value_name = "PERSISTENCE_TIMEOUT",
79 value_parser = parse_duration,
80 default_value = "120s",
81 verbatim_doc_comment
82 )]
83 persistence_timeout: Duration,
84
85 #[arg(
88 long = "rpc-block-buffer-size",
89 value_name = "RPC_BLOCK_BUFFER_SIZE",
90 default_value = "20",
91 verbatim_doc_comment
92 )]
93 rpc_block_buffer_size: usize,
94
95 #[command(flatten)]
96 benchmark: BenchmarkArgs,
97}
98
99impl Command {
100 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
102 if let Some(duration) = self.wait_time {
104 info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
105 }
106 if self.wait_for_persistence {
107 info!(
108 target: "reth-bench",
109 "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
110 self.persistence_threshold + 1,
111 self.persistence_threshold
112 );
113 }
114
115 let mut waiter = match (self.wait_time, self.wait_for_persistence) {
118 (Some(duration), true) => {
119 let ws_url = derive_ws_rpc_url(
120 self.benchmark.ws_rpc_url.as_deref(),
121 &self.benchmark.engine_rpc_url,
122 )?;
123 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
124 Some(PersistenceWaiter::with_duration_and_subscription(
125 duration,
126 sub,
127 self.persistence_threshold,
128 self.persistence_timeout,
129 ))
130 }
131 (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
132 (None, true) => {
133 let ws_url = derive_ws_rpc_url(
134 self.benchmark.ws_rpc_url.as_deref(),
135 &self.benchmark.engine_rpc_url,
136 )?;
137 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
138 Some(PersistenceWaiter::with_subscription(
139 sub,
140 self.persistence_threshold,
141 self.persistence_timeout,
142 ))
143 }
144 (None, false) => None,
145 };
146
147 let BenchContext {
148 benchmark_mode,
149 block_provider,
150 auth_provider,
151 mut next_block,
152 is_optimism,
153 ..
154 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
155
156 let total_blocks = benchmark_mode.total_blocks();
157 let buffer_size = self.rpc_block_buffer_size;
158
159 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
161 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
162
163 tokio::task::spawn(async move {
164 while benchmark_mode.contains(next_block) {
165 let block_res = block_provider
166 .get_block_by_number(next_block.into())
167 .full()
168 .await
169 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
170 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
171 Ok(block) => block,
172 Err(e) => {
173 tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
174 let _ = error_sender.send(e);
175 break;
176 }
177 };
178
179 let head_block_hash = block.header.hash;
180 let safe_block_hash = block_provider
181 .get_block_by_number(block.header.number.saturating_sub(32).into());
182
183 let finalized_block_hash = block_provider
184 .get_block_by_number(block.header.number.saturating_sub(64).into());
185
186 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
187
188 let safe_block_hash = match safe {
189 Ok(Some(block)) => block.header.hash,
190 Ok(None) | Err(_) => head_block_hash,
191 };
192
193 let finalized_block_hash = match finalized {
194 Ok(Some(block)) => block.header.hash,
195 Ok(None) | Err(_) => head_block_hash,
196 };
197
198 next_block += 1;
199 if let Err(e) = sender
200 .send((block, head_block_hash, safe_block_hash, finalized_block_hash))
201 .await
202 {
203 tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
204 break;
205 }
206 }
207 });
208
209 let mut results = Vec::new();
210 let mut blocks_processed = 0u64;
211 let total_benchmark_duration = Instant::now();
212 let mut total_wait_time = Duration::ZERO;
213
214 while let Some((block, head, safe, finalized)) = {
215 let wait_start = Instant::now();
216 let result = receiver.recv().await;
217 total_wait_time += wait_start.elapsed();
218 result
219 } {
220 let gas_used = block.header.gas_used;
221 let gas_limit = block.header.gas_limit;
222 let block_number = block.header.number;
223 let transaction_count = block.transactions.len() as u64;
224
225 debug!(target: "reth-bench", ?block_number, "Sending payload");
226
227 let forkchoice_state = ForkchoiceState {
228 head_block_hash: head,
229 safe_block_hash: safe,
230 finalized_block_hash: finalized,
231 };
232
233 let (version, params) = block_to_new_payload(block, is_optimism)?;
234 let start = Instant::now();
235 call_new_payload(&auth_provider, version, params).await?;
236
237 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
238
239 call_forkchoice_updated(&auth_provider, version, forkchoice_state, None).await?;
240
241 let total_latency = start.elapsed();
242 let fcu_latency = total_latency - new_payload_result.latency;
243 let combined_result = CombinedResult {
244 block_number,
245 gas_limit,
246 transaction_count,
247 new_payload_result,
248 fcu_latency,
249 total_latency,
250 };
251
252 blocks_processed += 1;
255 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
256 let progress = match total_blocks {
257 Some(total) => format!("{blocks_processed}/{total}"),
258 None => format!("{blocks_processed}"),
259 };
260 info!(target: "reth-bench", progress, %combined_result);
261
262 if let Some(w) = &mut waiter {
263 w.on_block(block_number).await?;
264 }
265
266 let gas_row =
267 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
268 results.push((gas_row, combined_result));
269 }
270
271 if let Ok(error) = error_receiver.try_recv() {
273 return Err(error);
274 }
275
276 drop(waiter);
279
280 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
281 results.into_iter().unzip();
282
283 if let Some(ref path) = self.benchmark.output {
284 write_benchmark_results(path, &gas_output_results, &combined_results)?;
285 }
286
287 let gas_output =
288 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
289
290 info!(
291 target: "reth-bench",
292 total_gas_used = gas_output.total_gas_used,
293 total_duration = ?gas_output.total_duration,
294 execution_duration = ?gas_output.execution_duration,
295 blocks_processed = gas_output.blocks_processed,
296 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
297 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
298 "Benchmark complete"
299 );
300
301 Ok(())
302 }
303}