1use crate::{
15 authenticated_transport::AuthenticatedTransportConnect,
16 bench::{
17 helpers::parse_duration,
18 output::{
19 write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
20 TotalGasOutput, TotalGasRow,
21 },
22 persistence_waiter::{
23 derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
24 },
25 },
26 valid_payload::{call_forkchoice_updated, call_new_payload},
27};
28use alloy_primitives::B256;
29use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
30use alloy_rpc_client::ClientBuilder;
31use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
32use clap::Parser;
33use eyre::Context;
34use reth_cli_runner::CliContext;
35use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
36use reth_node_api::EngineApiMessageVersion;
37use std::{
38 path::PathBuf,
39 time::{Duration, Instant},
40};
41use tracing::{debug, info};
42use url::Url;
43
44#[derive(Debug, Parser)]
49pub struct Command {
50 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
52 engine_rpc_url: String,
53
54 #[arg(long, value_name = "JWT_SECRET")]
56 jwt_secret: PathBuf,
57
58 #[arg(long, value_name = "PAYLOAD_DIR")]
60 payload_dir: PathBuf,
61
62 #[arg(long, value_name = "COUNT")]
65 count: Option<usize>,
66
67 #[arg(long, value_name = "SKIP", default_value = "0")]
69 skip: usize,
70
71 #[arg(long, value_name = "GAS_RAMP_DIR")]
74 gas_ramp_dir: Option<PathBuf>,
75
76 #[arg(long, value_name = "OUTPUT")]
78 output: Option<PathBuf>,
79
80 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
85 wait_time: Option<Duration>,
86
87 #[arg(long, default_value = "false", verbatim_doc_comment)]
95 wait_for_persistence: bool,
96
97 #[arg(
103 long = "persistence-threshold",
104 value_name = "PERSISTENCE_THRESHOLD",
105 default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
106 verbatim_doc_comment
107 )]
108 persistence_threshold: u64,
109
110 #[arg(
115 long = "persistence-timeout",
116 value_name = "PERSISTENCE_TIMEOUT",
117 value_parser = parse_duration,
118 default_value = "120s",
119 verbatim_doc_comment
120 )]
121 persistence_timeout: Duration,
122
123 #[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
126 ws_rpc_url: Option<String>,
127}
128
129struct LoadedPayload {
131 index: u64,
133 envelope: ExecutionPayloadEnvelopeV4,
135 block_hash: B256,
137}
138
139struct GasRampPayload {
141 block_number: u64,
143 version: EngineApiMessageVersion,
145 file: GasRampPayloadFile,
147}
148
149impl Command {
150 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
152 info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
153
154 if let Some(duration) = self.wait_time {
156 info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
157 }
158 if self.wait_for_persistence {
159 info!(
160 target: "reth-bench",
161 "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
162 self.persistence_threshold + 1,
163 self.persistence_threshold
164 );
165 }
166
167 let mut waiter = match (self.wait_time, self.wait_for_persistence) {
170 (Some(duration), true) => {
171 let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
172 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
173 Some(PersistenceWaiter::with_duration_and_subscription(
174 duration,
175 sub,
176 self.persistence_threshold,
177 self.persistence_timeout,
178 ))
179 }
180 (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
181 (None, true) => {
182 let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
183 let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
184 Some(PersistenceWaiter::with_subscription(
185 sub,
186 self.persistence_threshold,
187 self.persistence_timeout,
188 ))
189 }
190 (None, false) => None,
191 };
192
193 let jwt =
195 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
196 let jwt = JwtSecret::from_hex(jwt.trim())?;
197 let auth_url = Url::parse(&self.engine_rpc_url)?;
198
199 info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
200 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
201 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
202 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
203
204 let parent_block = auth_provider
206 .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
207 .await?
208 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
209
210 let initial_parent_hash = parent_block.header.hash;
211 let initial_parent_number = parent_block.header.number;
212
213 info!(
214 target: "reth-bench",
215 parent_hash = %initial_parent_hash,
216 parent_number = initial_parent_number,
217 "Using initial parent block"
218 );
219
220 let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
222 let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
223 if payloads.is_empty() {
224 return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
225 }
226 info!(target: "reth-bench", count = payloads.len(), "Loaded gas ramp payloads from disk");
227 payloads
228 } else {
229 Vec::new()
230 };
231
232 let payloads = self.load_payloads()?;
233 if payloads.is_empty() {
234 return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
235 }
236 info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
237
238 let mut parent_hash = initial_parent_hash;
239
240 for (i, payload) in gas_ramp_payloads.iter().enumerate() {
242 info!(
243 target: "reth-bench",
244 gas_ramp_payload = i + 1,
245 total = gas_ramp_payloads.len(),
246 block_number = payload.block_number,
247 block_hash = %payload.file.block_hash,
248 "Executing gas ramp payload (newPayload + FCU)"
249 );
250
251 call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
252
253 let fcu_state = ForkchoiceState {
254 head_block_hash: payload.file.block_hash,
255 safe_block_hash: parent_hash,
256 finalized_block_hash: parent_hash,
257 };
258 call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
259
260 info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
261
262 if let Some(w) = &mut waiter {
263 w.on_block(payload.block_number).await?;
264 }
265
266 parent_hash = payload.file.block_hash;
267 }
268
269 if !gas_ramp_payloads.is_empty() {
270 info!(target: "reth-bench", count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
271 }
272
273 let mut results = Vec::new();
274 let total_benchmark_duration = Instant::now();
275
276 for (i, payload) in payloads.iter().enumerate() {
277 let envelope = &payload.envelope;
278 let block_hash = payload.block_hash;
279 let execution_payload = &envelope.envelope_inner.execution_payload;
280 let inner_payload = &execution_payload.payload_inner.payload_inner;
281
282 let gas_used = inner_payload.gas_used;
283 let gas_limit = inner_payload.gas_limit;
284 let block_number = inner_payload.block_number;
285 let transaction_count =
286 execution_payload.payload_inner.payload_inner.transactions.len() as u64;
287
288 debug!(
289 target: "reth-bench",
290 payload = i + 1,
291 total = payloads.len(),
292 index = payload.index,
293 block_hash = %block_hash,
294 "Executing payload (newPayload + FCU)"
295 );
296
297 let start = Instant::now();
298
299 debug!(
300 target: "reth-bench",
301 method = "engine_newPayloadV4",
302 block_hash = %block_hash,
303 "Sending newPayload"
304 );
305
306 let status = auth_provider
307 .new_payload_v4(
308 execution_payload.clone(),
309 vec![],
310 B256::ZERO,
311 envelope.execution_requests.to_vec(),
312 )
313 .await?;
314
315 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
316
317 if !status.is_valid() {
318 return Err(eyre::eyre!("Payload rejected: {:?}", status));
319 }
320
321 let fcu_state = ForkchoiceState {
322 head_block_hash: block_hash,
323 safe_block_hash: parent_hash,
324 finalized_block_hash: parent_hash,
325 };
326
327 debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
328
329 let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
330
331 let total_latency = start.elapsed();
332 let fcu_latency = total_latency - new_payload_result.latency;
333
334 let combined_result = CombinedResult {
335 block_number,
336 gas_limit,
337 transaction_count,
338 new_payload_result,
339 fcu_latency,
340 total_latency,
341 };
342
343 let current_duration = total_benchmark_duration.elapsed();
344 let progress = format!("{}/{}", i + 1, payloads.len());
345 info!(target: "reth-bench", progress, %combined_result);
346
347 if let Some(w) = &mut waiter {
348 w.on_block(block_number).await?;
349 }
350
351 let gas_row =
352 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
353 results.push((gas_row, combined_result));
354
355 debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully");
356 parent_hash = block_hash;
357 }
358
359 drop(waiter);
362
363 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
364 results.into_iter().unzip();
365
366 if let Some(ref path) = self.output {
367 write_benchmark_results(path, &gas_output_results, &combined_results)?;
368 }
369
370 let gas_output =
371 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
372 info!(
373 target: "reth-bench",
374 total_gas_used = gas_output.total_gas_used,
375 total_duration = ?gas_output.total_duration,
376 execution_duration = ?gas_output.execution_duration,
377 blocks_processed = gas_output.blocks_processed,
378 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
379 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
380 "Benchmark complete"
381 );
382
383 Ok(())
384 }
385
386 fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
388 let mut payloads = Vec::new();
389
390 let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
392 .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
393 .filter_map(|e| e.ok())
394 .filter(|e| {
395 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
396 e.file_name().to_string_lossy().starts_with("payload_block_")
397 })
398 .collect();
399
400 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
402 .into_iter()
403 .filter_map(|e| {
404 let name = e.file_name();
405 let name_str = name.to_string_lossy();
406 let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
408 let index: u64 = index_str.parse().ok()?;
409 Some((index, e.path()))
410 })
411 .collect();
412
413 indexed_paths.sort_by_key(|(idx, _)| *idx);
414
415 let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
417 let indexed_paths: Vec<_> = match self.count {
418 Some(count) => indexed_paths.into_iter().take(count).collect(),
419 None => indexed_paths,
420 };
421
422 for (index, path) in indexed_paths {
424 let content = std::fs::read_to_string(&path)
425 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
426 let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
427 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
428
429 let block_hash =
430 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
431
432 debug!(
433 target: "reth-bench",
434 index = index,
435 block_hash = %block_hash,
436 path = %path.display(),
437 "Loaded payload"
438 );
439
440 payloads.push(LoadedPayload { index, envelope, block_hash });
441 }
442
443 Ok(payloads)
444 }
445
446 fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
448 let mut payloads = Vec::new();
449
450 let entries: Vec<_> = std::fs::read_dir(dir)
451 .wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
452 .filter_map(|e| e.ok())
453 .filter(|e| {
454 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
455 e.file_name().to_string_lossy().starts_with("payload_block_")
456 })
457 .collect();
458
459 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
461 .into_iter()
462 .filter_map(|e| {
463 let name = e.file_name();
464 let name_str = name.to_string_lossy();
465 let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
467 let block_number: u64 = block_str.parse().ok()?;
468 Some((block_number, e.path()))
469 })
470 .collect();
471
472 indexed_paths.sort_by_key(|(num, _)| *num);
473
474 for (block_number, path) in indexed_paths {
475 let content = std::fs::read_to_string(&path)
476 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
477 let file: GasRampPayloadFile = serde_json::from_str(&content)
478 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
479
480 let version = match file.version {
481 1 => EngineApiMessageVersion::V1,
482 2 => EngineApiMessageVersion::V2,
483 3 => EngineApiMessageVersion::V3,
484 4 => EngineApiMessageVersion::V4,
485 5 => EngineApiMessageVersion::V5,
486 v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
487 };
488
489 info!(
490 block_number,
491 block_hash = %file.block_hash,
492 path = %path.display(),
493 "Loaded gas ramp payload"
494 );
495
496 payloads.push(GasRampPayload { block_number, version, file });
497 }
498
499 Ok(payloads)
500 }
501}