1use crate::{
15 authenticated_transport::AuthenticatedTransportConnect,
16 bench::{
17 helpers::parse_duration,
18 metrics_scraper::MetricsScraper,
19 output::{
20 write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
21 TotalGasOutput, TotalGasRow,
22 },
23 persistence_waiter::{
24 derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
25 },
26 },
27 valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
28};
29use alloy_primitives::B256;
30use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
31use alloy_rpc_client::ClientBuilder;
32use alloy_rpc_types_engine::{
33 CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
34 ForkchoiceState, JwtSecret, PraguePayloadFields,
35};
36use clap::Parser;
37use eyre::Context;
38use reth_cli_runner::CliContext;
39use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
40use reth_node_api::EngineApiMessageVersion;
41use reth_rpc_api::RethNewPayloadInput;
42use std::{
43 path::PathBuf,
44 time::{Duration, Instant},
45};
46use tracing::{debug, info};
47use url::Url;
48
49#[derive(Debug, Parser)]
54pub struct Command {
55 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
57 engine_rpc_url: String,
58
59 #[arg(long, value_name = "JWT_SECRET")]
61 jwt_secret: PathBuf,
62
63 #[arg(long, value_name = "PAYLOAD_DIR")]
65 payload_dir: PathBuf,
66
67 #[arg(long, value_name = "COUNT")]
70 count: Option<usize>,
71
72 #[arg(long, value_name = "SKIP", default_value = "0")]
74 skip: usize,
75
76 #[arg(long, value_name = "GAS_RAMP_DIR")]
79 gas_ramp_dir: Option<PathBuf>,
80
81 #[arg(long, value_name = "OUTPUT")]
83 output: Option<PathBuf>,
84
85 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
90 wait_time: Option<Duration>,
91
92 #[arg(long, default_value = "false", verbatim_doc_comment)]
100 wait_for_persistence: bool,
101
102 #[arg(
108 long = "persistence-threshold",
109 value_name = "PERSISTENCE_THRESHOLD",
110 default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
111 verbatim_doc_comment
112 )]
113 persistence_threshold: u64,
114
115 #[arg(
120 long = "persistence-timeout",
121 value_name = "PERSISTENCE_TIMEOUT",
122 value_parser = parse_duration,
123 default_value = "120s",
124 verbatim_doc_comment
125 )]
126 persistence_timeout: Duration,
127
128 #[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
131 ws_rpc_url: Option<String>,
132
133 #[arg(long, default_value = "false", verbatim_doc_comment)]
139 reth_new_payload: bool,
140
141 #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
147 metrics_url: Option<String>,
148}
149
150struct LoadedPayload {
152 index: u64,
154 envelope: ExecutionPayloadEnvelopeV4,
156 block_hash: B256,
158}
159
160struct GasRampPayload {
162 block_number: u64,
164 version: Option<EngineApiMessageVersion>,
168 file: GasRampPayloadFile,
170}
171
172impl Command {
173 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
175 info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
176
177 if let Some(duration) = self.wait_time {
179 info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
180 }
181 if self.wait_for_persistence {
182 info!(
183 target: "reth-bench",
184 "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
185 self.persistence_threshold + 1,
186 self.persistence_threshold
187 );
188 }
189 if self.reth_new_payload {
190 info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
191 }
192
193 let mut waiter = match (self.wait_time, self.wait_for_persistence) {
196 (Some(duration), true) => {
197 let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
198 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
199 Some(PersistenceWaiter::with_duration_and_subscription(
200 duration,
201 sub,
202 self.persistence_threshold,
203 self.persistence_timeout,
204 ))
205 }
206 (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
207 (None, true) => {
208 let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
209 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
210 Some(PersistenceWaiter::with_subscription(
211 sub,
212 self.persistence_threshold,
213 self.persistence_timeout,
214 ))
215 }
216 (None, false) => None,
217 };
218
219 let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
220
221 let jwt =
223 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
224 let jwt = JwtSecret::from_hex(jwt.trim())?;
225 let auth_url = Url::parse(&self.engine_rpc_url)?;
226
227 info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
228 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
229 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
230 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
231
232 let parent_block = auth_provider
234 .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
235 .await?
236 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
237
238 let initial_parent_hash = parent_block.header.hash;
239 let initial_parent_number = parent_block.header.number;
240
241 info!(
242 target: "reth-bench",
243 parent_hash = %initial_parent_hash,
244 parent_number = initial_parent_number,
245 "Using initial parent block"
246 );
247
248 let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
250 let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
251 if payloads.is_empty() {
252 return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
253 }
254 info!(target: "reth-bench", count = payloads.len(), "Loaded gas ramp payloads from disk");
255 payloads
256 } else {
257 Vec::new()
258 };
259
260 let payloads = self.load_payloads()?;
261 if payloads.is_empty() {
262 return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
263 }
264 info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
265
266 let mut parent_hash = initial_parent_hash;
267
268 for (i, payload) in gas_ramp_payloads.iter().enumerate() {
270 info!(
271 target: "reth-bench",
272 gas_ramp_payload = i + 1,
273 total = gas_ramp_payloads.len(),
274 block_number = payload.block_number,
275 block_hash = %payload.file.block_hash,
276 "Executing gas ramp payload (newPayload + FCU)"
277 );
278
279 let _ = call_new_payload_with_reth(
280 &auth_provider,
281 payload.version,
282 payload.file.params.clone(),
283 )
284 .await?;
285
286 let fcu_state = ForkchoiceState {
287 head_block_hash: payload.file.block_hash,
288 safe_block_hash: parent_hash,
289 finalized_block_hash: parent_hash,
290 };
291 call_forkchoice_updated_with_reth(&auth_provider, payload.version, fcu_state).await?;
292
293 info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
294
295 if let Some(w) = &mut waiter {
296 w.on_block(payload.block_number).await?;
297 }
298
299 parent_hash = payload.file.block_hash;
300 }
301
302 if !gas_ramp_payloads.is_empty() {
303 info!(target: "reth-bench", count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
304 }
305
306 let mut results = Vec::new();
307 let total_benchmark_duration = Instant::now();
308
309 for (i, payload) in payloads.iter().enumerate() {
310 let envelope = &payload.envelope;
311 let block_hash = payload.block_hash;
312 let execution_payload = &envelope.envelope_inner.execution_payload;
313 let inner_payload = &execution_payload.payload_inner.payload_inner;
314
315 let gas_used = inner_payload.gas_used;
316 let gas_limit = inner_payload.gas_limit;
317 let block_number = inner_payload.block_number;
318 let transaction_count =
319 execution_payload.payload_inner.payload_inner.transactions.len() as u64;
320
321 debug!(
322 target: "reth-bench",
323 payload = i + 1,
324 total = payloads.len(),
325 index = payload.index,
326 block_hash = %block_hash,
327 "Executing payload (newPayload + FCU)"
328 );
329
330 let start = Instant::now();
331
332 debug!(
333 target: "reth-bench",
334 method = "engine_newPayloadV4",
335 block_hash = %block_hash,
336 "Sending newPayload"
337 );
338
339 let (version, params) = if self.reth_new_payload {
340 let reth_data = ExecutionData {
341 payload: execution_payload.clone().into(),
342 sidecar: ExecutionPayloadSidecar::v4(
343 CancunPayloadFields {
344 versioned_hashes: Vec::new(),
345 parent_beacon_block_root: B256::ZERO,
346 },
347 PraguePayloadFields {
348 requests: envelope.execution_requests.clone().into(),
349 },
350 ),
351 };
352 (None, serde_json::to_value((RethNewPayloadInput::ExecutionData(reth_data),))?)
353 } else {
354 (
355 Some(EngineApiMessageVersion::V4),
356 serde_json::to_value((
357 execution_payload.clone(),
358 Vec::<B256>::new(),
359 B256::ZERO,
360 envelope.execution_requests.to_vec(),
361 ))?,
362 )
363 };
364
365 let server_timings =
366 call_new_payload_with_reth(&auth_provider, version, params).await?;
367
368 let np_latency =
369 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
370 let new_payload_result = NewPayloadResult {
371 gas_used,
372 latency: np_latency,
373 persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
374 execution_cache_wait: server_timings
375 .as_ref()
376 .map(|t| t.execution_cache_wait)
377 .unwrap_or_default(),
378 sparse_trie_wait: server_timings
379 .as_ref()
380 .map(|t| t.sparse_trie_wait)
381 .unwrap_or_default(),
382 };
383
384 let fcu_state = ForkchoiceState {
385 head_block_hash: block_hash,
386 safe_block_hash: parent_hash,
387 finalized_block_hash: parent_hash,
388 };
389
390 let fcu_start = Instant::now();
391 call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
392 let fcu_latency = fcu_start.elapsed();
393
394 let total_latency =
395 if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
396
397 let combined_result = CombinedResult {
398 block_number,
399 gas_limit,
400 transaction_count,
401 new_payload_result,
402 fcu_latency,
403 total_latency,
404 };
405
406 let current_duration = total_benchmark_duration.elapsed();
407 let progress = format!("{}/{}", i + 1, payloads.len());
408 info!(target: "reth-bench", progress, %combined_result);
409
410 if let Some(scraper) = metrics_scraper.as_mut() &&
411 let Err(err) = scraper.scrape_after_block(block_number).await
412 {
413 tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
414 }
415
416 if let Some(w) = &mut waiter {
417 w.on_block(block_number).await?;
418 }
419
420 let gas_row =
421 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
422 results.push((gas_row, combined_result));
423
424 parent_hash = block_hash;
425 }
426
427 drop(waiter);
430
431 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
432 results.into_iter().unzip();
433
434 if let Some(ref path) = self.output {
435 write_benchmark_results(path, &gas_output_results, &combined_results)?;
436 }
437
438 if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
439 scraper.write_csv(path)?;
440 }
441
442 let gas_output =
443 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
444 info!(
445 target: "reth-bench",
446 total_gas_used = gas_output.total_gas_used,
447 total_duration = ?gas_output.total_duration,
448 execution_duration = ?gas_output.execution_duration,
449 blocks_processed = gas_output.blocks_processed,
450 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
451 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
452 "Benchmark complete"
453 );
454
455 Ok(())
456 }
457
458 fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
460 let mut payloads = Vec::new();
461
462 let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
464 .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
465 .filter_map(|e| e.ok())
466 .filter(|e| {
467 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
468 e.file_name().to_string_lossy().starts_with("payload_block_")
469 })
470 .collect();
471
472 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
474 .into_iter()
475 .filter_map(|e| {
476 let name = e.file_name();
477 let name_str = name.to_string_lossy();
478 let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
480 let index: u64 = index_str.parse().ok()?;
481 Some((index, e.path()))
482 })
483 .collect();
484
485 indexed_paths.sort_by_key(|(idx, _)| *idx);
486
487 let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
489 let indexed_paths: Vec<_> = match self.count {
490 Some(count) => indexed_paths.into_iter().take(count).collect(),
491 None => indexed_paths,
492 };
493
494 for (index, path) in indexed_paths {
496 let content = std::fs::read_to_string(&path)
497 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
498 let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
499 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
500
501 let block_hash =
502 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
503
504 debug!(
505 target: "reth-bench",
506 index = index,
507 block_hash = %block_hash,
508 path = %path.display(),
509 "Loaded payload"
510 );
511
512 payloads.push(LoadedPayload { index, envelope, block_hash });
513 }
514
515 Ok(payloads)
516 }
517
518 fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
520 let mut payloads = Vec::new();
521
522 let entries: Vec<_> = std::fs::read_dir(dir)
523 .wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
524 .filter_map(|e| e.ok())
525 .filter(|e| {
526 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
527 e.file_name().to_string_lossy().starts_with("payload_block_")
528 })
529 .collect();
530
531 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
533 .into_iter()
534 .filter_map(|e| {
535 let name = e.file_name();
536 let name_str = name.to_string_lossy();
537 let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
539 let block_number: u64 = block_str.parse().ok()?;
540 Some((block_number, e.path()))
541 })
542 .collect();
543
544 indexed_paths.sort_by_key(|(num, _)| *num);
545
546 for (block_number, path) in indexed_paths {
547 let content = std::fs::read_to_string(&path)
548 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
549 let file: GasRampPayloadFile = serde_json::from_str(&content)
550 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
551
552 let version = if let Some(version) = file.version {
553 match version {
554 1 => EngineApiMessageVersion::V1,
555 2 => EngineApiMessageVersion::V2,
556 3 => EngineApiMessageVersion::V3,
557 4 => EngineApiMessageVersion::V4,
558 5 => EngineApiMessageVersion::V5,
559 v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
560 }
561 .into()
562 } else {
563 None
564 };
565
566 info!(
567 block_number,
568 block_hash = %file.block_hash,
569 path = %path.display(),
570 "Loaded gas ramp payload"
571 );
572
573 payloads.push(GasRampPayload { block_number, version, file });
574 }
575
576 Ok(payloads)
577 }
578}