Skip to main content

reth_bench/bench/
replay_payloads.rs

1//! Command for replaying pre-generated payloads from disk.
2
3use crate::{
4    authenticated_transport::AuthenticatedTransportConnect,
5    bench::{
6        generate_big_block::{compute_payload_block_hash, BigBlockPayload},
7        helpers::parse_duration,
8        metrics_scraper::MetricsScraper,
9        output::{
10            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
11        },
12    },
13    valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
14};
15use alloy_eip7928::bal::Bal;
16use alloy_eips::eip7928::BlockAccessList;
17use alloy_primitives::B256;
18use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
19use alloy_rpc_client::ClientBuilder;
20use alloy_rpc_types_engine::{
21    CancunPayloadFields, ExecutionData, ExecutionPayload, ExecutionPayloadEnvelopeV4,
22    ExecutionPayloadSidecar, ExecutionPayloadV4, ForkchoiceState, JwtSecret, PraguePayloadFields,
23};
24use clap::Parser;
25use eyre::Context;
26use reth_cli_runner::CliContext;
27use reth_engine_primitives::BigBlockData;
28use reth_node_api::EngineApiMessageVersion;
29use reth_node_core::args::WaitForPersistence;
30use reth_rpc_api::RethNewPayloadInput;
31use std::{
32    path::PathBuf,
33    time::{Duration, Instant},
34};
35use tracing::{debug, info, warn};
36use url::Url;
37
38/// `reth bench replay-payloads` command
39///
40/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
41/// `forkchoiceUpdated` for each payload in sequence.
42#[derive(Debug, Parser)]
43pub struct Command {
44    /// The engine RPC URL (with JWT authentication).
45    #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
46    engine_rpc_url: String,
47
48    /// Path to the JWT secret file for engine API authentication.
49    #[arg(long, value_name = "JWT_SECRET")]
50    jwt_secret: PathBuf,
51
52    /// Directory containing payload files (`payload_block_N.json`).
53    #[arg(long, value_name = "PAYLOAD_DIR")]
54    payload_dir: PathBuf,
55
56    /// Optional limit on the number of payloads to replay.
57    /// If not specified, replays all payloads in the directory.
58    #[arg(long, value_name = "COUNT")]
59    count: Option<usize>,
60
61    /// Skip the first N payloads.
62    #[arg(long, value_name = "SKIP", default_value = "0")]
63    skip: usize,
64
65    /// Deprecated: gas ramp is no longer needed. This flag is accepted but ignored.
66    #[arg(long, value_name = "GAS_RAMP_DIR", hide = true)]
67    gas_ramp_dir: Option<PathBuf>,
68
69    /// Optional output directory for benchmark results (CSV files).
70    #[arg(long, value_name = "OUTPUT")]
71    output: Option<PathBuf>,
72
73    /// How long to wait after a forkchoice update before sending the next payload.
74    ///
75    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
76    /// milliseconds (e.g. `400`).
77    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
78    wait_time: Option<Duration>,
79
80    /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
81    ///
82    /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
83    /// directly, waits for persistence and cache updates to complete before processing,
84    /// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
85    #[arg(long, default_value = "false", verbatim_doc_comment)]
86    reth_new_payload: bool,
87
88    /// Forward embedded block access lists to `reth_newPayload` when payload files contain them.
89    ///
90    /// Disabled by default so the same payload set can be replayed with or without BALs.
91    ///
92    /// Requires `--reth-new-payload`.
93    #[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
94    bal: bool,
95
96    /// Control when `reth_newPayload` waits for in-flight persistence.
97    ///
98    /// Accepts `always` (default — wait on every block), `never`, or a number N
99    /// to wait every N blocks and skip the rest.
100    ///
101    /// Requires `--reth-new-payload`.
102    #[arg(
103        long = "wait-for-persistence",
104        value_name = "MODE",
105        num_args = 0..=1,
106        default_missing_value = "always",
107        value_parser = clap::value_parser!(WaitForPersistence),
108        requires = "reth_new_payload",
109        verbatim_doc_comment
110    )]
111    wait_for_persistence: Option<WaitForPersistence>,
112
113    /// Skip waiting for execution cache and sparse trie locks before processing.
114    ///
115    /// Only works with `--reth-new-payload`. When set, passes `wait_for_caches: false`
116    /// to the `reth_newPayload` endpoint.
117    #[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
118    no_wait_for_caches: bool,
119
120    /// Optional Prometheus metrics endpoint to scrape after each block.
121    ///
122    /// When provided, reth-bench will fetch metrics from this URL after each
123    /// payload, recording per-block execution and state root durations.
124    /// Results are written to `metrics.csv` in the output directory.
125    #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
126    metrics_url: Option<String>,
127}
128
129/// A loaded payload ready for execution.
130struct LoadedPayload {
131    /// The index (from filename).
132    index: u64,
133    /// The execution data for the block.
134    execution_data: ExecutionData,
135    /// The block hash.
136    block_hash: B256,
137    /// Big block data containing environment switches and prior block hashes.
138    big_block_data: BigBlockData<ExecutionData>,
139    /// Optional BAL flattened into the payload file.
140    block_access_list: Option<BlockAccessList>,
141}
142
143impl Command {
144    /// Execute the `replay-payloads` command.
145    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
146        info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
147
148        // Log mode configuration
149        if let Some(duration) = self.wait_time {
150            info!(target: "reth-bench", "Using wait-time mode with {}ms minimum interval between blocks", duration.as_millis());
151        }
152        if self.reth_new_payload {
153            info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
154            if self.bal {
155                info!(target: "reth-bench", "Forwarding embedded block_access_list data");
156            }
157        }
158
159        let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
160
161        // Set up authenticated engine provider
162        let jwt =
163            std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
164        let jwt = JwtSecret::from_hex(jwt.trim())?;
165        let auth_url = Url::parse(&self.engine_rpc_url)?;
166
167        info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
168        let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
169        let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
170        let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
171
172        // Get parent block (latest canonical block) - we need this for the first FCU
173        let parent_block = auth_provider
174            .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
175            .await?
176            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
177
178        let initial_parent_hash = parent_block.header.hash;
179        let initial_parent_number = parent_block.header.number;
180
181        info!(
182            target: "reth-bench",
183            parent_hash = %initial_parent_hash,
184            parent_number = initial_parent_number,
185            "Using initial parent block"
186        );
187
188        // Warn if deprecated --gas-ramp-dir is passed
189        if self.gas_ramp_dir.is_some() {
190            warn!(
191                target: "reth-bench",
192                "--gas-ramp-dir is deprecated and ignored."
193            );
194        }
195
196        // Load all payloads upfront to avoid I/O delays between phases
197        let payloads = self.load_payloads()?;
198        if payloads.is_empty() {
199            return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
200        }
201        info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
202
203        let has_env_switches = payloads.iter().any(|p| !p.big_block_data.env_switches.is_empty());
204        let has_block_access_lists = payloads.iter().any(|p| {
205            p.block_access_list.as_ref().is_some_and(|bal: &BlockAccessList| !bal.is_empty())
206        });
207
208        // If any payload has env_switches but we're not using reth_newPayload, warn the user
209        if !self.reth_new_payload {
210            if has_env_switches {
211                warn!(
212                    target: "reth-bench",
213                    "Payloads contain env_switches but --reth-new-payload is not set. \
214                     env_switches are only supported with reth_newPayload and will be ignored."
215                );
216            }
217            if has_block_access_lists {
218                warn!(
219                    target: "reth-bench",
220                    "Payloads contain block_access_list data but --reth-new-payload is not set. \
221                     BALs are only forwarded with reth_newPayload and will be ignored."
222                );
223            }
224        } else if has_block_access_lists && !self.bal {
225            info!(
226                target: "reth-bench",
227                "Payloads contain block_access_list data but --bal is not set. BALs will be ignored."
228            );
229        }
230
231        let mut parent_hash = initial_parent_hash;
232
233        let mut results = Vec::new();
234        let total_benchmark_duration = Instant::now();
235
236        for (i, payload) in payloads.iter().enumerate() {
237            let execution_data = &payload.execution_data;
238            let mut block_hash = payload.block_hash;
239            let v1 = execution_data.payload.as_v1();
240
241            let gas_used = v1.gas_used;
242            let gas_limit = v1.gas_limit;
243            let block_number = v1.block_number;
244            let transaction_count = v1.transactions.len() as u64;
245
246            debug!(
247                target: "reth-bench",
248                payload = i + 1,
249                total = payloads.len(),
250                index = payload.index,
251                block_hash = %block_hash,
252                "Executing payload (newPayload + FCU)"
253            );
254
255            let start = Instant::now();
256
257            debug!(
258                target: "reth-bench",
259                method = "engine_newPayloadV4",
260                block_hash = %block_hash,
261                "Sending newPayload"
262            );
263
264            let (version, params) = if self.reth_new_payload {
265                let big_block_data_param = if payload.big_block_data.env_switches.is_empty() &&
266                    payload.big_block_data.prior_block_hashes.is_empty()
267                {
268                    None
269                } else {
270                    Some(payload.big_block_data.clone())
271                };
272                let wait_for_persistence = self
273                    .wait_for_persistence
274                    .unwrap_or(WaitForPersistence::Never)
275                    .rpc_value(block_number);
276
277                // Inject sidecar BAL into the inline V4 payload field when --bal is set.
278                // If the payload is not already V4 we upgrade it (V3→V4) so the BAL
279                // can be carried inline. This changes the block hash, so we recompute
280                // it and patch parent_hash to maintain the chain.
281                let mut execution_data = execution_data.clone();
282                if self.bal &&
283                    let Some(bal) = &payload.block_access_list
284                {
285                    let encoded_bal: alloy_primitives::Bytes =
286                        alloy_rlp::encode(Bal::from(bal.clone())).into();
287
288                    // Upgrade to V4 if necessary, then set the BAL field.
289                    if execution_data.payload.as_v4().is_none() {
290                        execution_data.payload = upgrade_to_v4(execution_data.payload, encoded_bal);
291                    } else {
292                        execution_data.payload.as_v4_mut().unwrap().block_access_list = encoded_bal;
293                    }
294
295                    // Patch parent_hash so this block chains off the (possibly
296                    // rehashed) previous block.
297                    execution_data.payload.as_v1_mut().parent_hash = parent_hash;
298
299                    // Recompute block hash after payload modification and update
300                    // the hash stored in the payload itself.
301                    block_hash = compute_payload_block_hash(&execution_data)?;
302                    execution_data.payload.as_v1_mut().block_hash = block_hash;
303                }
304
305                (
306                    None,
307                    serde_json::to_value((
308                        RethNewPayloadInput::ExecutionData(execution_data),
309                        wait_for_persistence,
310                        self.no_wait_for_caches.then_some(false),
311                        big_block_data_param,
312                    ))?,
313                )
314            } else {
315                let requests =
316                    execution_data.sidecar.requests().cloned().unwrap_or_default().to_vec();
317                (
318                    Some(EngineApiMessageVersion::V4),
319                    serde_json::to_value((
320                        execution_data.payload.clone(),
321                        Vec::<B256>::new(),
322                        B256::ZERO,
323                        requests,
324                    ))?,
325                )
326            };
327
328            let server_timings =
329                call_new_payload_with_reth(&auth_provider, version, params).await?;
330
331            let np_latency =
332                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
333            let new_payload_result = NewPayloadResult {
334                gas_used,
335                latency: np_latency,
336                persistence_wait: server_timings
337                    .as_ref()
338                    .map(|t| t.persistence_wait)
339                    .unwrap_or_default(),
340                execution_cache_wait: server_timings
341                    .as_ref()
342                    .map(|t| t.execution_cache_wait)
343                    .unwrap_or_default(),
344                sparse_trie_wait: server_timings
345                    .as_ref()
346                    .map(|t| t.sparse_trie_wait)
347                    .unwrap_or_default(),
348            };
349
350            let fcu_state = ForkchoiceState {
351                head_block_hash: block_hash,
352                safe_block_hash: parent_hash,
353                finalized_block_hash: parent_hash,
354            };
355
356            let fcu_start = Instant::now();
357            call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
358            let fcu_latency = fcu_start.elapsed();
359
360            let total_latency =
361                if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
362
363            let combined_result = CombinedResult {
364                block_number,
365                gas_limit,
366                transaction_count,
367                new_payload_result,
368                fcu_latency,
369                total_latency,
370            };
371
372            let current_duration = total_benchmark_duration.elapsed();
373            let progress = format!("{}/{}", i + 1, payloads.len());
374            info!(target: "reth-bench", progress, %combined_result);
375
376            if let Some(scraper) = metrics_scraper.as_mut() &&
377                let Err(err) = scraper.scrape_after_block(block_number).await
378            {
379                tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
380            }
381
382            if let Some(wait_time) = self.wait_time {
383                let remaining = wait_time.saturating_sub(start.elapsed());
384                if !remaining.is_zero() {
385                    tokio::time::sleep(remaining).await;
386                }
387            }
388
389            let gas_row =
390                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
391            results.push((gas_row, combined_result));
392
393            parent_hash = block_hash;
394        }
395
396        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
397            results.into_iter().unzip();
398
399        if let Some(ref path) = self.output {
400            write_benchmark_results(path, &gas_output_results, &combined_results)?;
401        }
402
403        if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
404            scraper.write_csv(path)?;
405        }
406
407        let gas_output =
408            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
409        info!(
410            target: "reth-bench",
411            total_gas_used = gas_output.total_gas_used,
412            total_duration = ?gas_output.total_duration,
413            execution_duration = ?gas_output.execution_duration,
414            blocks_processed = gas_output.blocks_processed,
415            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
416            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
417            "Benchmark complete"
418        );
419
420        Ok(())
421    }
422
423    /// Load and parse all payload files from the directory.
424    ///
425    /// Tries to load each file as a [`BigBlockPayload`] first (which includes `env_switches`),
426    /// falling back to [`ExecutionPayloadEnvelopeV4`] for backwards compatibility.
427    fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
428        let mut payloads = Vec::new();
429
430        // Read directory entries — match both legacy "payload_block_*.json" and new
431        // "big_block_*.json" formats
432        let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
433            .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
434            .filter_map(|e| e.ok())
435            .filter(|e| {
436                let name = e.file_name();
437                let name_str = name.to_string_lossy();
438                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
439                    (name_str.starts_with("payload_block_") ||
440                        name_str.starts_with("big_block_"))
441            })
442            .collect();
443
444        // Parse filenames to get indices and sort.
445        // Supports "payload_block_N.json" and "big_block_FROM_to_TO.json" naming.
446        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
447            .into_iter()
448            .filter_map(|e| {
449                let name = e.file_name();
450                let name_str = name.to_string_lossy();
451                let index = if let Some(rest) = name_str.strip_prefix("payload_block_") {
452                    rest.strip_suffix(".json")?.parse::<u64>().ok()?
453                } else if let Some(rest) = name_str.strip_prefix("big_block_") {
454                    // "big_block_FROM_to_TO.json" — use FROM as the index
455                    let rest = rest.strip_suffix(".json")?;
456                    rest.split("_to_").next()?.parse::<u64>().ok()?
457                } else {
458                    return None;
459                };
460                Some((index, e.path()))
461            })
462            .collect();
463
464        indexed_paths.sort_by_key(|(idx, _)| *idx);
465
466        // Apply skip and count
467        let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
468        let indexed_paths: Vec<_> = match self.count {
469            Some(count) => indexed_paths.into_iter().take(count).collect(),
470            None => indexed_paths,
471        };
472
473        // Load each payload
474        for (index, path) in indexed_paths {
475            let content = std::fs::read_to_string(&path)
476                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
477
478            // Try BigBlockPayload first, then fall back to legacy ExecutionPayloadEnvelopeV4
479            let (execution_data, big_block_data, block_access_list) = if let Ok(big_block) =
480                serde_json::from_str::<BigBlockPayload>(&content)
481            {
482                (big_block.execution_data, big_block.big_block_data, big_block.block_access_list)
483            } else {
484                let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
485                    .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
486                let execution_data = ExecutionData {
487                    payload: envelope.envelope_inner.execution_payload.clone().into(),
488                    sidecar: ExecutionPayloadSidecar::v4(
489                        CancunPayloadFields {
490                            versioned_hashes: Vec::new(),
491                            parent_beacon_block_root: B256::ZERO,
492                        },
493                        PraguePayloadFields {
494                            requests: envelope.execution_requests.clone().into(),
495                        },
496                    ),
497                };
498                (execution_data, BigBlockData::default(), None)
499            };
500
501            let block_hash = execution_data.payload.as_v1().block_hash;
502
503            debug!(
504                target: "reth-bench",
505                index = index,
506                block_hash = %block_hash,
507                env_switches = big_block_data.env_switches.len(),
508                prior_block_hashes = big_block_data.prior_block_hashes.len(),
509                bal_accounts = block_access_list.as_ref().map_or(0, Vec::len),
510                path = %path.display(),
511                "Loaded payload"
512            );
513
514            payloads.push(LoadedPayload {
515                index,
516                execution_data,
517                block_hash,
518                big_block_data,
519                block_access_list,
520            });
521        }
522
523        Ok(payloads)
524    }
525}
526
527/// Upgrades an [`ExecutionPayload`] to V4 by wrapping the inner V3 payload (constructing
528/// default V2/V3 layers for V1 payloads if needed) and setting the provided BAL bytes.
529fn upgrade_to_v4(
530    payload: ExecutionPayload,
531    block_access_list: alloy_primitives::Bytes,
532) -> ExecutionPayload {
533    use alloy_rpc_types_engine::{ExecutionPayloadV2, ExecutionPayloadV3};
534
535    let v3 = match payload {
536        ExecutionPayload::V4(_) => unreachable!("caller checks as_v4().is_none()"),
537        ExecutionPayload::V3(v3) => v3,
538        ExecutionPayload::V2(v2) => {
539            ExecutionPayloadV3 { payload_inner: v2, blob_gas_used: 0, excess_blob_gas: 0 }
540        }
541        ExecutionPayload::V1(v1) => ExecutionPayloadV3 {
542            payload_inner: ExecutionPayloadV2 { payload_inner: v1, withdrawals: Vec::new() },
543            blob_gas_used: 0,
544            excess_blob_gas: 0,
545        },
546    };
547
548    ExecutionPayload::V4(ExecutionPayloadV4 {
549        payload_inner: v3,
550        block_access_list,
551        slot_number: 0,
552    })
553}