1use crate::{
4 authenticated_transport::AuthenticatedTransportConnect,
5 bench::{
6 generate_big_block::{compute_payload_block_hash, BigBlockPayload},
7 helpers::parse_duration,
8 metrics_scraper::MetricsScraper,
9 output::{
10 write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
11 },
12 },
13 valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
14};
15use alloy_eip7928::bal::Bal;
16use alloy_eips::eip7928::BlockAccessList;
17use alloy_primitives::B256;
18use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
19use alloy_rpc_client::ClientBuilder;
20use alloy_rpc_types_engine::{
21 CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadEnvelopeV4,
22 ExecutionPayloadSidecar, ExecutionPayloadV4, ForkchoiceState, JwtSecret, PraguePayloadFields,
23};
24use clap::Parser;
25use eyre::Context;
26use reth_cli_runner::CliContext;
27use reth_engine_primitives::BigBlockData;
28use reth_node_api::EngineApiMessageVersion;
29use reth_node_core::args::WaitForPersistence;
30use reth_rpc_api::RethNewPayloadInput;
31use std::{
32 path::PathBuf,
33 time::{Duration, Instant},
34};
35use tracing::{debug, info, warn};
36use url::Url;
37
38#[derive(Debug, Parser)]
43pub struct Command {
44 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
46 engine_rpc_url: String,
47
48 #[arg(long, value_name = "JWT_SECRET")]
50 jwt_secret: PathBuf,
51
52 #[arg(long, value_name = "PAYLOAD_DIR")]
54 payload_dir: PathBuf,
55
56 #[arg(long, value_name = "COUNT")]
59 count: Option<usize>,
60
61 #[arg(long, value_name = "SKIP", default_value = "0")]
63 skip: usize,
64
65 #[arg(long, value_name = "GAS_RAMP_DIR", hide = true)]
67 gas_ramp_dir: Option<PathBuf>,
68
69 #[arg(long, value_name = "OUTPUT")]
71 output: Option<PathBuf>,
72
73 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
78 wait_time: Option<Duration>,
79
80 #[arg(long, default_value = "false", verbatim_doc_comment)]
86 reth_new_payload: bool,
87
88 #[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
94 bal: bool,
95
96 #[arg(
103 long = "wait-for-persistence",
104 value_name = "MODE",
105 num_args = 0..=1,
106 default_missing_value = "always",
107 value_parser = clap::value_parser!(WaitForPersistence),
108 requires = "reth_new_payload",
109 verbatim_doc_comment
110 )]
111 wait_for_persistence: Option<WaitForPersistence>,
112
113 #[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
118 no_wait_for_caches: bool,
119
120 #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
126 metrics_url: Option<String>,
127}
128
129struct LoadedPayload {
131 index: u64,
133 execution_data: ExecutionData,
135 block_hash: B256,
137 big_block_data: BigBlockData<ExecutionData>,
139 block_access_list: Option<BlockAccessList>,
141}
142
143impl Command {
144 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
146 info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
147
148 if let Some(duration) = self.wait_time {
150 info!(target: "reth-bench", "Using wait-time mode with {}ms minimum interval between blocks", duration.as_millis());
151 }
152 if self.reth_new_payload {
153 info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
154 if self.bal {
155 info!(target: "reth-bench", "Forwarding embedded block_access_list data");
156 }
157 }
158
159 let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
160
161 let jwt =
163 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
164 let jwt = JwtSecret::from_hex(jwt.trim())?;
165 let auth_url = Url::parse(&self.engine_rpc_url)?;
166
167 info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
168 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
169 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
170 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
171
172 let parent_block = auth_provider
174 .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
175 .await?
176 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
177
178 let initial_parent_hash = parent_block.header.hash;
179 let initial_parent_number = parent_block.header.number;
180
181 info!(
182 target: "reth-bench",
183 parent_hash = %initial_parent_hash,
184 parent_number = initial_parent_number,
185 "Using initial parent block"
186 );
187
188 if self.gas_ramp_dir.is_some() {
190 warn!(
191 target: "reth-bench",
192 "--gas-ramp-dir is deprecated and ignored."
193 );
194 }
195
196 let payloads = self.load_payloads()?;
198 if payloads.is_empty() {
199 return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
200 }
201 info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
202
203 let has_env_switches = payloads.iter().any(|p| !p.big_block_data.env_switches.is_empty());
204 let has_block_access_lists = payloads.iter().any(|p| {
205 p.block_access_list.as_ref().is_some_and(|bal: &BlockAccessList| !bal.is_empty())
206 });
207
208 if !self.reth_new_payload {
210 if has_env_switches {
211 warn!(
212 target: "reth-bench",
213 "Payloads contain env_switches but --reth-new-payload is not set. \
214 env_switches are only supported with reth_newPayload and will be ignored."
215 );
216 }
217 if has_block_access_lists {
218 warn!(
219 target: "reth-bench",
220 "Payloads contain block_access_list data but --reth-new-payload is not set. \
221 BALs are only forwarded with reth_newPayload and will be ignored."
222 );
223 }
224 } else if has_block_access_lists && !self.bal {
225 info!(
226 target: "reth-bench",
227 "Payloads contain block_access_list data but --bal is not set. BALs will be ignored."
228 );
229 }
230
231 let mut parent_hash = initial_parent_hash;
232
233 let mut results = Vec::new();
234 let total_benchmark_duration = Instant::now();
235
236 for (i, payload) in payloads.iter().enumerate() {
237 let execution_data = &payload.execution_data;
238 let mut block_hash = payload.block_hash;
239 let v1 = execution_data.payload.as_v1();
240
241 let gas_used = v1.gas_used;
242 let gas_limit = v1.gas_limit;
243 let block_number = v1.block_number;
244 let transaction_count = v1.transactions.len() as u64;
245
246 debug!(
247 target: "reth-bench",
248 payload = i + 1,
249 total = payloads.len(),
250 index = payload.index,
251 block_hash = %block_hash,
252 "Executing payload (newPayload + FCU)"
253 );
254
255 let start = Instant::now();
256
257 debug!(
258 target: "reth-bench",
259 method = "engine_newPayloadV4",
260 block_hash = %block_hash,
261 "Sending newPayload"
262 );
263
264 let (version, params) = if self.reth_new_payload {
265 let big_block_data_param = if payload.big_block_data.env_switches.is_empty() &&
266 payload.big_block_data.prior_block_hashes.is_empty()
267 {
268 None
269 } else {
270 Some(payload.big_block_data.clone())
271 };
272 let wait_for_persistence = self
273 .wait_for_persistence
274 .unwrap_or(WaitForPersistence::Never)
275 .rpc_value(block_number);
276
277 let mut execution_data = execution_data.clone();
282 if self.bal &&
283 let Some(bal) = &payload.block_access_list
284 {
285 let encoded_bal: alloy_primitives::Bytes =
286 alloy_rlp::encode(Bal::from(bal.clone())).into();
287
288 if execution_data.payload.as_v4().is_none() {
290 execution_data.payload = upgrade_to_v4(execution_data.payload, encoded_bal);
291 } else {
292 execution_data.payload.as_v4_mut().unwrap().block_access_list = encoded_bal;
293 }
294
295 execution_data.payload.as_v1_mut().parent_hash = parent_hash;
298
299 block_hash = compute_payload_block_hash(&execution_data)?;
302 execution_data.payload.as_v1_mut().block_hash = block_hash;
303 }
304
305 (
306 None,
307 serde_json::to_value((
308 RethNewPayloadInput::ExecutionData(execution_data),
309 wait_for_persistence,
310 self.no_wait_for_caches.then_some(false),
311 big_block_data_param,
312 ))?,
313 )
314 } else {
315 let requests =
316 execution_data.sidecar.requests().cloned().unwrap_or_default().to_vec();
317 (
318 Some(EngineApiMessageVersion::V4),
319 serde_json::to_value((
320 execution_data.payload.clone(),
321 Vec::<B256>::new(),
322 B256::ZERO,
323 requests,
324 ))?,
325 )
326 };
327
328 let server_timings =
329 call_new_payload_with_reth(&auth_provider, version, params).await?;
330
331 let np_latency =
332 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
333 let new_payload_result = NewPayloadResult {
334 gas_used,
335 latency: np_latency,
336 persistence_wait: server_timings
337 .as_ref()
338 .map(|t| t.persistence_wait)
339 .unwrap_or_default(),
340 execution_cache_wait: server_timings
341 .as_ref()
342 .map(|t| t.execution_cache_wait)
343 .unwrap_or_default(),
344 sparse_trie_wait: server_timings
345 .as_ref()
346 .map(|t| t.sparse_trie_wait)
347 .unwrap_or_default(),
348 };
349
350 let fcu_state = ForkchoiceState {
351 head_block_hash: block_hash,
352 safe_block_hash: parent_hash,
353 finalized_block_hash: parent_hash,
354 };
355
356 let fcu_start = Instant::now();
357 call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
358 let fcu_latency = fcu_start.elapsed();
359
360 let total_latency =
361 if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
362
363 let combined_result = CombinedResult {
364 block_number,
365 gas_limit,
366 transaction_count,
367 new_payload_result,
368 fcu_latency,
369 total_latency,
370 };
371
372 let current_duration = total_benchmark_duration.elapsed();
373 let progress = format!("{}/{}", i + 1, payloads.len());
374 info!(target: "reth-bench", progress, %combined_result);
375
376 if let Some(scraper) = metrics_scraper.as_mut() &&
377 let Err(err) = scraper.scrape_after_block(block_number).await
378 {
379 tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
380 }
381
382 if let Some(wait_time) = self.wait_time {
383 let remaining = wait_time.saturating_sub(start.elapsed());
384 if !remaining.is_zero() {
385 tokio::time::sleep(remaining).await;
386 }
387 }
388
389 let gas_row =
390 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
391 results.push((gas_row, combined_result));
392
393 parent_hash = block_hash;
394 }
395
396 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
397 results.into_iter().unzip();
398
399 if let Some(ref path) = self.output {
400 write_benchmark_results(path, &gas_output_results, &combined_results)?;
401 }
402
403 if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
404 scraper.write_csv(path)?;
405 }
406
407 let gas_output =
408 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
409 info!(
410 target: "reth-bench",
411 total_gas_used = gas_output.total_gas_used,
412 total_duration = ?gas_output.total_duration,
413 execution_duration = ?gas_output.execution_duration,
414 blocks_processed = gas_output.blocks_processed,
415 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
416 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
417 "Benchmark complete"
418 );
419
420 Ok(())
421 }
422
423 fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
428 let mut payloads = Vec::new();
429
430 let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
433 .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
434 .filter_map(|e| e.ok())
435 .filter(|e| {
436 let name = e.file_name();
437 let name_str = name.to_string_lossy();
438 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
439 (name_str.starts_with("payload_block_") ||
440 name_str.starts_with("big_block_"))
441 })
442 .collect();
443
444 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
447 .into_iter()
448 .filter_map(|e| {
449 let name = e.file_name();
450 let name_str = name.to_string_lossy();
451 let index = if let Some(rest) = name_str.strip_prefix("payload_block_") {
452 rest.strip_suffix(".json")?.parse::<u64>().ok()?
453 } else if let Some(rest) = name_str.strip_prefix("big_block_") {
454 let rest = rest.strip_suffix(".json")?;
456 rest.split("_to_").next()?.parse::<u64>().ok()?
457 } else {
458 return None;
459 };
460 Some((index, e.path()))
461 })
462 .collect();
463
464 indexed_paths.sort_by_key(|(idx, _)| *idx);
465
466 let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
468 let indexed_paths: Vec<_> = match self.count {
469 Some(count) => indexed_paths.into_iter().take(count).collect(),
470 None => indexed_paths,
471 };
472
473 for (index, path) in indexed_paths {
475 let content = std::fs::read_to_string(&path)
476 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
477
478 let (execution_data, big_block_data, block_access_list) = if let Ok(big_block) =
480 serde_json::from_str::<BigBlockPayload>(&content)
481 {
482 (big_block.execution_data, big_block.big_block_data, big_block.block_access_list)
483 } else {
484 let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
485 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
486 let execution_data = ExecutionData {
487 payload: envelope.envelope_inner.execution_payload.clone().into(),
488 sidecar: ExecutionPayloadSidecar::v4(
489 CancunPayloadFields {
490 versioned_hashes: Vec::new(),
491 parent_beacon_block_root: B256::ZERO,
492 },
493 PraguePayloadFields {
494 requests: envelope.execution_requests.clone().into(),
495 },
496 ),
497 };
498 (execution_data, BigBlockData::default(), None)
499 };
500
501 let block_hash = execution_data.payload.as_v1().block_hash;
502
503 debug!(
504 target: "reth-bench",
505 index = index,
506 block_hash = %block_hash,
507 env_switches = big_block_data.env_switches.len(),
508 prior_block_hashes = big_block_data.prior_block_hashes.len(),
509 bal_accounts = block_access_list.as_ref().map_or(0, Vec::len),
510 path = %path.display(),
511 "Loaded payload"
512 );
513
514 payloads.push(LoadedPayload {
515 index,
516 execution_data,
517 block_hash,
518 big_block_data,
519 block_access_list,
520 });
521 }
522
523 Ok(payloads)
524 }
525}
526
527fn upgrade_to_v4(
530 payload: ExecutionPayload,
531 block_access_list: alloy_primitives::Bytes,
532) -> ExecutionPayload {
533 use alloy_rpc_types_engine::{ExecutionPayloadV2, ExecutionPayloadV3};
534
535 let v3 = match payload {
536 ExecutionPayload::V4(_) => unreachable!("caller checks as_v4().is_none()"),
537 ExecutionPayload::V3(v3) => v3,
538 ExecutionPayload::V2(v2) => {
539 ExecutionPayloadV3 { payload_inner: v2, blob_gas_used: 0, excess_blob_gas: 0 }
540 }
541 ExecutionPayload::V1(v1) => ExecutionPayloadV3 {
542 payload_inner: ExecutionPayloadV2 { payload_inner: v1, withdrawals: Vec::new() },
543 blob_gas_used: 0,
544 excess_blob_gas: 0,
545 },
546 };
547
548 ExecutionPayload::V4(ExecutionPayloadV4 {
549 payload_inner: v3,
550 block_access_list,
551 slot_number: 0,
552 })
553}