1use crate::{
4 authenticated_transport::AuthenticatedTransportConnect,
5 bench::{
6 helpers::parse_duration,
7 metrics_scraper::MetricsScraper,
8 output::{
9 write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
10 },
11 },
12 valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
13};
14use alloy_primitives::B256;
15use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
16use alloy_rpc_client::ClientBuilder;
17use alloy_rpc_types_engine::{
18 CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
19 ForkchoiceState, JwtSecret, PraguePayloadFields,
20};
21use clap::Parser;
22use eyre::Context;
23use reth_cli_runner::CliContext;
24use reth_node_api::EngineApiMessageVersion;
25use reth_node_core::args::WaitForPersistence;
26use reth_rpc_api::RethNewPayloadInput;
27use std::{
28 path::PathBuf,
29 time::{Duration, Instant},
30};
31use tracing::{debug, info, warn};
32use url::Url;
33
34#[derive(Debug, Parser)]
39pub struct Command {
40 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
42 engine_rpc_url: String,
43
44 #[arg(long, value_name = "JWT_SECRET")]
46 jwt_secret: PathBuf,
47
48 #[arg(long, value_name = "PAYLOAD_DIR")]
50 payload_dir: PathBuf,
51
52 #[arg(long, value_name = "COUNT")]
55 count: Option<usize>,
56
57 #[arg(long, value_name = "SKIP", default_value = "0")]
59 skip: usize,
60
61 #[arg(long, value_name = "GAS_RAMP_DIR", hide = true)]
64 gas_ramp_dir: Option<PathBuf>,
65
66 #[arg(long, value_name = "OUTPUT")]
68 output: Option<PathBuf>,
69
70 #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
75 wait_time: Option<Duration>,
76
77 #[arg(long, default_value = "false", verbatim_doc_comment)]
83 reth_new_payload: bool,
84
85 #[arg(
92 long = "wait-for-persistence",
93 value_name = "MODE",
94 num_args = 0..=1,
95 default_missing_value = "always",
96 value_parser = clap::value_parser!(WaitForPersistence),
97 requires = "reth_new_payload",
98 verbatim_doc_comment
99 )]
100 wait_for_persistence: Option<WaitForPersistence>,
101
102 #[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
107 no_wait_for_caches: bool,
108
109 #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
115 metrics_url: Option<String>,
116}
117
118struct LoadedPayload {
120 index: u64,
122 envelope: ExecutionPayloadEnvelopeV4,
124 block_hash: B256,
126}
127
128impl Command {
129 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
131 info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
132
133 if let Some(duration) = self.wait_time {
135 info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
136 }
137 if self.reth_new_payload {
138 info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
139 }
140
141 let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
142
143 let jwt =
145 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
146 let jwt = JwtSecret::from_hex(jwt.trim())?;
147 let auth_url = Url::parse(&self.engine_rpc_url)?;
148
149 info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
150 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
151 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
152 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
153
154 let parent_block = auth_provider
156 .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
157 .await?
158 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
159
160 let initial_parent_hash = parent_block.header.hash;
161 let initial_parent_number = parent_block.header.number;
162
163 info!(
164 target: "reth-bench",
165 parent_hash = %initial_parent_hash,
166 parent_number = initial_parent_number,
167 "Using initial parent block"
168 );
169
170 if self.gas_ramp_dir.is_some() {
172 warn!(
173 target: "reth-bench",
174 "--gas-ramp-dir is deprecated and ignored. Use --testing.skip-gas-limit-ramp-check \
175 and --testing.gas-limit on the reth node instead."
176 );
177 }
178
179 let payloads = self.load_payloads()?;
181 if payloads.is_empty() {
182 return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
183 }
184 info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
185
186 let mut parent_hash = initial_parent_hash;
187
188 let mut results = Vec::new();
189 let total_benchmark_duration = Instant::now();
190
191 for (i, payload) in payloads.iter().enumerate() {
192 let envelope = &payload.envelope;
193 let block_hash = payload.block_hash;
194 let execution_payload = &envelope.envelope_inner.execution_payload;
195 let inner_payload = &execution_payload.payload_inner.payload_inner;
196
197 let gas_used = inner_payload.gas_used;
198 let gas_limit = inner_payload.gas_limit;
199 let block_number = inner_payload.block_number;
200 let transaction_count =
201 execution_payload.payload_inner.payload_inner.transactions.len() as u64;
202
203 debug!(
204 target: "reth-bench",
205 payload = i + 1,
206 total = payloads.len(),
207 index = payload.index,
208 block_hash = %block_hash,
209 "Executing payload (newPayload + FCU)"
210 );
211
212 let start = Instant::now();
213
214 debug!(
215 target: "reth-bench",
216 method = "engine_newPayloadV4",
217 block_hash = %block_hash,
218 "Sending newPayload"
219 );
220
221 let use_reth = self.reth_new_payload;
222 let (version, params) = if use_reth {
223 let wait_for_persistence = self
224 .wait_for_persistence
225 .unwrap_or(WaitForPersistence::Never)
226 .rpc_value(block_number);
227 let reth_data = ExecutionData {
228 payload: execution_payload.clone().into(),
229 sidecar: ExecutionPayloadSidecar::v4(
230 CancunPayloadFields {
231 versioned_hashes: Vec::new(),
232 parent_beacon_block_root: B256::ZERO,
233 },
234 PraguePayloadFields {
235 requests: envelope.execution_requests.clone().into(),
236 },
237 ),
238 };
239 (
240 None,
241 serde_json::to_value((
242 RethNewPayloadInput::ExecutionData(reth_data),
243 wait_for_persistence,
244 self.no_wait_for_caches.then_some(false),
245 ))?,
246 )
247 } else {
248 (
249 Some(EngineApiMessageVersion::V4),
250 serde_json::to_value((
251 execution_payload.clone(),
252 Vec::<B256>::new(),
253 B256::ZERO,
254 envelope.execution_requests.to_vec(),
255 ))?,
256 )
257 };
258
259 let server_timings =
260 call_new_payload_with_reth(&auth_provider, version, params).await?;
261
262 let np_latency =
263 server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
264 let new_payload_result = NewPayloadResult {
265 gas_used,
266 latency: np_latency,
267 persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
268 execution_cache_wait: server_timings
269 .as_ref()
270 .map(|t| t.execution_cache_wait)
271 .unwrap_or_default(),
272 sparse_trie_wait: server_timings
273 .as_ref()
274 .map(|t| t.sparse_trie_wait)
275 .unwrap_or_default(),
276 };
277
278 let fcu_state = ForkchoiceState {
279 head_block_hash: block_hash,
280 safe_block_hash: parent_hash,
281 finalized_block_hash: parent_hash,
282 };
283
284 let fcu_start = Instant::now();
285 call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
286 let fcu_latency = fcu_start.elapsed();
287
288 let total_latency =
289 if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
290
291 let combined_result = CombinedResult {
292 block_number,
293 gas_limit,
294 transaction_count,
295 new_payload_result,
296 fcu_latency,
297 total_latency,
298 };
299
300 let current_duration = total_benchmark_duration.elapsed();
301 let progress = format!("{}/{}", i + 1, payloads.len());
302 info!(target: "reth-bench", progress, %combined_result);
303
304 if let Some(scraper) = metrics_scraper.as_mut() &&
305 let Err(err) = scraper.scrape_after_block(block_number).await
306 {
307 tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
308 }
309
310 let gas_row =
311 TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
312 results.push((gas_row, combined_result));
313
314 parent_hash = block_hash;
315 }
316
317 let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
318 results.into_iter().unzip();
319
320 if let Some(ref path) = self.output {
321 write_benchmark_results(path, &gas_output_results, &combined_results)?;
322 }
323
324 if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
325 scraper.write_csv(path)?;
326 }
327
328 let gas_output =
329 TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
330 info!(
331 target: "reth-bench",
332 total_gas_used = gas_output.total_gas_used,
333 total_duration = ?gas_output.total_duration,
334 execution_duration = ?gas_output.execution_duration,
335 blocks_processed = gas_output.blocks_processed,
336 wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
337 execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
338 "Benchmark complete"
339 );
340
341 Ok(())
342 }
343
344 fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
346 let mut payloads = Vec::new();
347
348 let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
350 .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
351 .filter_map(|e| e.ok())
352 .filter(|e| {
353 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
354 e.file_name().to_string_lossy().starts_with("payload_block_")
355 })
356 .collect();
357
358 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
360 .into_iter()
361 .filter_map(|e| {
362 let name = e.file_name();
363 let name_str = name.to_string_lossy();
364 let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
366 let index: u64 = index_str.parse().ok()?;
367 Some((index, e.path()))
368 })
369 .collect();
370
371 indexed_paths.sort_by_key(|(idx, _)| *idx);
372
373 let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
375 let indexed_paths: Vec<_> = match self.count {
376 Some(count) => indexed_paths.into_iter().take(count).collect(),
377 None => indexed_paths,
378 };
379
380 for (index, path) in indexed_paths {
382 let content = std::fs::read_to_string(&path)
383 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
384 let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
385 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
386
387 let block_hash =
388 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
389
390 debug!(
391 target: "reth-bench",
392 index = index,
393 block_hash = %block_hash,
394 path = %path.display(),
395 "Loaded payload"
396 );
397
398 payloads.push(LoadedPayload { index, envelope, block_hash });
399 }
400
401 Ok(payloads)
402 }
403}