1use crate::{
13 bench::{
14 context::BenchContext,
15 helpers::parse_duration,
16 metrics_scraper::MetricsScraper,
17 output::{
18 write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
19 },
20 persistence_waiter::{
21 derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
22 },
23 },
24 valid_payload::{
25 block_to_new_payload, call_forkchoice_updated_with_reth, call_new_payload_with_reth,
26 },
27};
28use alloy_provider::{ext::DebugApi, Provider};
29use alloy_rpc_types_engine::ForkchoiceState;
30use clap::Parser;
31use eyre::{Context, OptionExt};
32use reth_cli_runner::CliContext;
33use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
34use reth_node_core::args::BenchmarkArgs;
35use std::time::{Duration, Instant};
36use tracing::{debug, info, warn};
37
38#[derive(Debug, Parser)]
40pub struct Command {
41 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
43 rpc_url: String,
44
45 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
50 wait_time: Option<Duration>,
51
52 #[arg(long, default_value = "false", verbatim_doc_comment)]
60 wait_for_persistence: bool,
61
62 #[arg(
68 long = "persistence-threshold",
69 value_name = "PERSISTENCE_THRESHOLD",
70 default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
71 verbatim_doc_comment
72 )]
73 persistence_threshold: u64,
74
75 #[arg(
80 long = "persistence-timeout",
81 value_name = "PERSISTENCE_TIMEOUT",
82 value_parser = parse_duration,
83 default_value = "120s",
84 verbatim_doc_comment
85 )]
86 persistence_timeout: Duration,
87
88 #[arg(
91 long = "rpc-block-buffer-size",
92 value_name = "RPC_BLOCK_BUFFER_SIZE",
93 default_value = "20",
94 verbatim_doc_comment
95 )]
96 rpc_block_buffer_size: usize,
97
98 #[command(flatten)]
99 benchmark: BenchmarkArgs,
100}
101
102impl Command {
103 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
105 if let Some(duration) = self.wait_time {
107 info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
108 }
109 if self.wait_for_persistence {
110 info!(
111 target: "reth-bench",
112 "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
113 self.persistence_threshold + 1,
114 self.persistence_threshold
115 );
116 }
117
118 let mut waiter = match (self.wait_time, self.wait_for_persistence) {
121 (Some(duration), true) => {
122 let ws_url = derive_ws_rpc_url(
123 self.benchmark.ws_rpc_url.as_deref(),
124 &self.benchmark.engine_rpc_url,
125 )?;
126 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
127 Some(PersistenceWaiter::with_duration_and_subscription(
128 duration,
129 sub,
130 self.persistence_threshold,
131 self.persistence_timeout,
132 ))
133 }
134 (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
135 (None, true) => {
136 let ws_url = derive_ws_rpc_url(
137 self.benchmark.ws_rpc_url.as_deref(),
138 &self.benchmark.engine_rpc_url,
139 )?;
140 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
141 Some(PersistenceWaiter::with_subscription(
142 sub,
143 self.persistence_threshold,
144 self.persistence_timeout,
145 ))
146 }
147 (None, false) => None,
148 };
149
150 let BenchContext {
151 benchmark_mode,
152 block_provider,
153 auth_provider,
154 mut next_block,
155 is_optimism,
156 use_reth_namespace,
157 rlp_blocks,
158 } = BenchContext::new(&self.benchmark, self.rpc_url).await?;
159
160 let total_blocks = benchmark_mode.total_blocks();
161
162 let mut metrics_scraper = MetricsScraper::maybe_new(self.benchmark.metrics_url.clone());
163
164 if use_reth_namespace {
165 info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
166 }
167
168 let buffer_size = self.rpc_block_buffer_size;
169
170 let (error_sender, mut error_receiver) = tokio::sync::oneshot::channel();
172 let (sender, mut receiver) = tokio::sync::mpsc::channel(buffer_size);
173
174 tokio::task::spawn(async move {
175 while benchmark_mode.contains(next_block) {
176 let block_res = block_provider
177 .get_block_by_number(next_block.into())
178 .full()
179 .await
180 .wrap_err_with(|| format!("Failed to fetch block by number {next_block}"));
181 let block = match block_res.and_then(|opt| opt.ok_or_eyre("Block not found")) {
182 Ok(block) => block,
183 Err(e) => {
184 tracing::error!(target: "reth-bench", "Failed to fetch block {next_block}: {e}");
185 let _ = error_sender.send(e);
186 break;
187 }
188 };
189
190 let rlp = if rlp_blocks {
191 let rlp = match block_provider.debug_get_raw_block(next_block.into()).await {
192 Ok(rlp) => rlp,
193 Err(e) => {
194 tracing::error!(target: "reth-bench", "Failed to fetch raw block {next_block}: {e}");
195 let _ = error_sender
196 .send(eyre::eyre!("Failed to fetch raw block {next_block}: {e}"));
197 break;
198 }
199 };
200 Some(rlp)
201 } else {
202 None
203 };
204
205 let head_block_hash = block.header.hash;
206 let safe_block_hash = block_provider
207 .get_block_by_number(block.header.number.saturating_sub(32).into());
208
209 let finalized_block_hash = block_provider
210 .get_block_by_number(block.header.number.saturating_sub(64).into());
211
212 let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
213
214 let safe_block_hash = match safe {
215 Ok(Some(block)) => block.header.hash,
216 Ok(None) | Err(_) => head_block_hash,
217 };
218
219 let finalized_block_hash = match finalized {
220 Ok(Some(block)) => block.header.hash,
221 Ok(None) | Err(_) => head_block_hash,
222 };
223
224 next_block += 1;
225 if let Err(e) = sender
226 .send((block, head_block_hash, safe_block_hash, finalized_block_hash, rlp))
227 .await
228 {
229 tracing::error!(target: "reth-bench", "Failed to send block data: {e}");
230 break;
231 }
232 }
233 });
234
235 let mut results = Vec::new();
236 let mut blocks_processed = 0u64;
237 let total_benchmark_duration = Instant::now();
238 let mut total_wait_time = Duration::ZERO;
239
240 while let Some((block, head, safe, finalized, rlp)) = {
241 let wait_start = Instant::now();
242 let result = receiver.recv().await;
243 total_wait_time += wait_start.elapsed();
244 result
245 } {
246 let gas_used = block.header.gas_used;
247 let gas_limit = block.header.gas_limit;
248 let block_number = block.header.number;
249 let transaction_count = block.transactions.len() as u64;
250
251 debug!(target: "reth-bench", ?block_number, "Sending payload");
252
253 let forkchoice_state = ForkchoiceState {
254 head_block_hash: head,
255 safe_block_hash: safe,
256 finalized_block_hash: finalized,
257 };
258
259 let (version, params) =
260 block_to_new_payload(block, is_optimism, rlp, use_reth_namespace)?;
261 let start = Instant::now();
262 let server_timings =
263 call_new_payload_with_reth(&auth_provider, version, params).await?;
264
265 let np_latency =
266 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
267 let new_payload_result = NewPayloadResult {
268 gas_used,
269 latency: np_latency,
270 persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
271 execution_cache_wait: server_timings
272 .as_ref()
273 .map(|t| t.execution_cache_wait)
274 .unwrap_or_default(),
275 sparse_trie_wait: server_timings
276 .as_ref()
277 .map(|t| t.sparse_trie_wait)
278 .unwrap_or_default(),
279 };
280
281 let fcu_start = Instant::now();
282 call_forkchoice_updated_with_reth(&auth_provider, version, forkchoice_state).await?;
283 let fcu_latency = fcu_start.elapsed();
284
285 let total_latency = if server_timings.is_some() {
286 np_latency + fcu_latency
290 } else {
291 start.elapsed()
292 };
293 let combined_result = CombinedResult {
294 block_number,
295 gas_limit,
296 transaction_count,
297 new_payload_result,
298 fcu_latency,
299 total_latency,
300 };
301
302 blocks_processed += 1;
305 let current_duration = total_benchmark_duration.elapsed() - total_wait_time;
306 let progress = match total_blocks {
307 Some(total) => format!("{blocks_processed}/{total}"),
308 None => format!("{blocks_processed}"),
309 };
310 info!(target: "reth-bench", progress, %combined_result);
311
312 if let Some(scraper) = metrics_scraper.as_mut() &&
313 let Err(err) = scraper.scrape_after_block(block_number).await
314 {
315 warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
316 }
317
318 if let Some(w) = &mut waiter {
319 w.on_block(block_number).await?;
320 }
321
322 let gas_row =
323 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
324 results.push((gas_row, combined_result));
325 }
326
327 if let Ok(error) = error_receiver.try_recv() {
329 return Err(error);
330 }
331
332 drop(waiter);
335
336 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
337 results.into_iter().unzip();
338
339 if let Some(ref path) = self.benchmark.output {
340 write_benchmark_results(path, &gas_output_results, &combined_results)?;
341 }
342
343 if let (Some(path), Some(scraper)) = (&self.benchmark.output, &metrics_scraper) {
344 scraper.write_csv(path)?;
345 }
346
347 let gas_output =
348 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
349
350 info!(
351 target: "reth-bench",
352 total_gas_used = gas_output.total_gas_used,
353 total_duration = ?gas_output.total_duration,
354 execution_duration = ?gas_output.execution_duration,
355 blocks_processed = gas_output.blocks_processed,
356 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
357 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
358 "Benchmark complete"
359 );
360
361 Ok(())
362 }
363}