Skip to main content

reth_bench/bench/
replay_payloads.rs

1//! Command for replaying pre-generated payloads from disk.
2//!
3//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
4//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
5//!
6//! Supports configurable waiting behavior:
7//! - **`--wait-time`**: Fixed sleep interval between blocks.
8//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
9//!   `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
10//!   threshold. This ensures the benchmark doesn't outpace persistence.
11//!
12//! Both options can be used together or independently.
13
14use crate::{
15    authenticated_transport::AuthenticatedTransportConnect,
16    bench::{
17        helpers::parse_duration,
18        metrics_scraper::MetricsScraper,
19        output::{
20            write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
21            TotalGasOutput, TotalGasRow,
22        },
23        persistence_waiter::{
24            derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
25        },
26    },
27    valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
28};
29use alloy_primitives::B256;
30use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
31use alloy_rpc_client::ClientBuilder;
32use alloy_rpc_types_engine::{
33    CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
34    ForkchoiceState, JwtSecret, PraguePayloadFields,
35};
36use clap::Parser;
37use eyre::Context;
38use reth_cli_runner::CliContext;
39use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
40use reth_node_api::EngineApiMessageVersion;
41use reth_rpc_api::RethNewPayloadInput;
42use std::{
43    path::PathBuf,
44    time::{Duration, Instant},
45};
46use tracing::{debug, info};
47use url::Url;
48
49/// `reth bench replay-payloads` command
50///
51/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
52/// `forkchoiceUpdated` for each payload in sequence.
53#[derive(Debug, Parser)]
54pub struct Command {
55    /// The engine RPC URL (with JWT authentication).
56    #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
57    engine_rpc_url: String,
58
59    /// Path to the JWT secret file for engine API authentication.
60    #[arg(long, value_name = "JWT_SECRET")]
61    jwt_secret: PathBuf,
62
63    /// Directory containing payload files (`payload_block_N.json`).
64    #[arg(long, value_name = "PAYLOAD_DIR")]
65    payload_dir: PathBuf,
66
67    /// Optional limit on the number of payloads to replay.
68    /// If not specified, replays all payloads in the directory.
69    #[arg(long, value_name = "COUNT")]
70    count: Option<usize>,
71
72    /// Skip the first N payloads.
73    #[arg(long, value_name = "SKIP", default_value = "0")]
74    skip: usize,
75
76    /// Optional directory containing gas ramp payloads to replay first.
77    /// These are replayed before the main payloads to warm up the gas limit.
78    #[arg(long, value_name = "GAS_RAMP_DIR")]
79    gas_ramp_dir: Option<PathBuf>,
80
81    /// Optional output directory for benchmark results (CSV files).
82    #[arg(long, value_name = "OUTPUT")]
83    output: Option<PathBuf>,
84
85    /// How long to wait after a forkchoice update before sending the next payload.
86    ///
87    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
88    /// milliseconds (e.g. `400`).
89    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
90    wait_time: Option<Duration>,
91
92    /// Wait for blocks to be persisted before sending the next batch.
93    ///
94    /// When enabled, waits for every Nth block to be persisted using the
95    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
96    /// doesn't outpace persistence.
97    ///
98    /// The subscription uses the regular RPC websocket endpoint (no JWT required).
99    #[arg(long, default_value = "false", verbatim_doc_comment)]
100    wait_for_persistence: bool,
101
102    /// Engine persistence threshold used for deciding when to wait for persistence.
103    ///
104    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
105    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
106    /// at blocks 3, 6, 9, etc.
107    #[arg(
108        long = "persistence-threshold",
109        value_name = "PERSISTENCE_THRESHOLD",
110        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
111        verbatim_doc_comment
112    )]
113    persistence_threshold: u64,
114
115    /// Timeout for waiting on persistence at each checkpoint.
116    ///
117    /// Must be long enough to account for the persistence thread being blocked
118    /// by pruning after the previous save.
119    #[arg(
120        long = "persistence-timeout",
121        value_name = "PERSISTENCE_TIMEOUT",
122        value_parser = parse_duration,
123        default_value = "120s",
124        verbatim_doc_comment
125    )]
126    persistence_timeout: Duration,
127
128    /// Optional `WebSocket` RPC URL for persistence subscription.
129    /// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
130    #[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
131    ws_rpc_url: Option<String>,
132
133    /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
134    ///
135    /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
136    /// directly, waits for persistence and cache updates to complete before processing,
137    /// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
138    #[arg(long, default_value = "false", verbatim_doc_comment)]
139    reth_new_payload: bool,
140
141    /// Optional Prometheus metrics endpoint to scrape after each block.
142    ///
143    /// When provided, reth-bench will fetch metrics from this URL after each
144    /// payload, recording per-block execution and state root durations.
145    /// Results are written to `metrics.csv` in the output directory.
146    #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
147    metrics_url: Option<String>,
148}
149
150/// A loaded payload ready for execution.
151struct LoadedPayload {
152    /// The index (from filename).
153    index: u64,
154    /// The payload envelope.
155    envelope: ExecutionPayloadEnvelopeV4,
156    /// The block hash.
157    block_hash: B256,
158}
159
160/// A gas ramp payload loaded from disk.
161struct GasRampPayload {
162    /// Block number from filename.
163    block_number: u64,
164    /// Engine API version for newPayload.
165    ///
166    /// `None` indicates that `reth_newPayload` should be used.
167    version: Option<EngineApiMessageVersion>,
168    /// The file contents.
169    file: GasRampPayloadFile,
170}
171
172impl Command {
173    /// Execute the `replay-payloads` command.
174    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
175        info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
176
177        // Log mode configuration
178        if let Some(duration) = self.wait_time {
179            info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
180        }
181        if self.wait_for_persistence {
182            info!(
183                target: "reth-bench",
184                "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
185                self.persistence_threshold + 1,
186                self.persistence_threshold
187            );
188        }
189        if self.reth_new_payload {
190            info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
191        }
192
193        // Set up waiter based on configured options
194        // When both are set: wait at least wait_time, and also wait for persistence if needed
195        let mut waiter = match (self.wait_time, self.wait_for_persistence) {
196            (Some(duration), true) => {
197                let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
198                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
199                Some(PersistenceWaiter::with_duration_and_subscription(
200                    duration,
201                    sub,
202                    self.persistence_threshold,
203                    self.persistence_timeout,
204                ))
205            }
206            (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
207            (None, true) => {
208                let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
209                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
210                Some(PersistenceWaiter::with_subscription(
211                    sub,
212                    self.persistence_threshold,
213                    self.persistence_timeout,
214                ))
215            }
216            (None, false) => None,
217        };
218
219        let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
220
221        // Set up authenticated engine provider
222        let jwt =
223            std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
224        let jwt = JwtSecret::from_hex(jwt.trim())?;
225        let auth_url = Url::parse(&self.engine_rpc_url)?;
226
227        info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
228        let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
229        let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
230        let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
231
232        // Get parent block (latest canonical block) - we need this for the first FCU
233        let parent_block = auth_provider
234            .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
235            .await?
236            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
237
238        let initial_parent_hash = parent_block.header.hash;
239        let initial_parent_number = parent_block.header.number;
240
241        info!(
242            target: "reth-bench",
243            parent_hash = %initial_parent_hash,
244            parent_number = initial_parent_number,
245            "Using initial parent block"
246        );
247
248        // Load all payloads upfront to avoid I/O delays between phases
249        let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
250            let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
251            if payloads.is_empty() {
252                return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
253            }
254            info!(target: "reth-bench", count = payloads.len(), "Loaded gas ramp payloads from disk");
255            payloads
256        } else {
257            Vec::new()
258        };
259
260        let payloads = self.load_payloads()?;
261        if payloads.is_empty() {
262            return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
263        }
264        info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
265
266        let mut parent_hash = initial_parent_hash;
267
268        // Replay gas ramp payloads first
269        for (i, payload) in gas_ramp_payloads.iter().enumerate() {
270            info!(
271                target: "reth-bench",
272                gas_ramp_payload = i + 1,
273                total = gas_ramp_payloads.len(),
274                block_number = payload.block_number,
275                block_hash = %payload.file.block_hash,
276                "Executing gas ramp payload (newPayload + FCU)"
277            );
278
279            let _ = call_new_payload_with_reth(
280                &auth_provider,
281                payload.version,
282                payload.file.params.clone(),
283            )
284            .await?;
285
286            let fcu_state = ForkchoiceState {
287                head_block_hash: payload.file.block_hash,
288                safe_block_hash: parent_hash,
289                finalized_block_hash: parent_hash,
290            };
291            call_forkchoice_updated_with_reth(&auth_provider, payload.version, fcu_state).await?;
292
293            info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
294
295            if let Some(w) = &mut waiter {
296                w.on_block(payload.block_number).await?;
297            }
298
299            parent_hash = payload.file.block_hash;
300        }
301
302        if !gas_ramp_payloads.is_empty() {
303            info!(target: "reth-bench", count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
304        }
305
306        let mut results = Vec::new();
307        let total_benchmark_duration = Instant::now();
308
309        for (i, payload) in payloads.iter().enumerate() {
310            let envelope = &payload.envelope;
311            let block_hash = payload.block_hash;
312            let execution_payload = &envelope.envelope_inner.execution_payload;
313            let inner_payload = &execution_payload.payload_inner.payload_inner;
314
315            let gas_used = inner_payload.gas_used;
316            let gas_limit = inner_payload.gas_limit;
317            let block_number = inner_payload.block_number;
318            let transaction_count =
319                execution_payload.payload_inner.payload_inner.transactions.len() as u64;
320
321            debug!(
322                target: "reth-bench",
323                payload = i + 1,
324                total = payloads.len(),
325                index = payload.index,
326                block_hash = %block_hash,
327                "Executing payload (newPayload + FCU)"
328            );
329
330            let start = Instant::now();
331
332            debug!(
333                target: "reth-bench",
334                method = "engine_newPayloadV4",
335                block_hash = %block_hash,
336                "Sending newPayload"
337            );
338
339            let (version, params) = if self.reth_new_payload {
340                let reth_data = ExecutionData {
341                    payload: execution_payload.clone().into(),
342                    sidecar: ExecutionPayloadSidecar::v4(
343                        CancunPayloadFields {
344                            versioned_hashes: Vec::new(),
345                            parent_beacon_block_root: B256::ZERO,
346                        },
347                        PraguePayloadFields {
348                            requests: envelope.execution_requests.clone().into(),
349                        },
350                    ),
351                };
352                (None, serde_json::to_value((RethNewPayloadInput::ExecutionData(reth_data),))?)
353            } else {
354                (
355                    Some(EngineApiMessageVersion::V4),
356                    serde_json::to_value((
357                        execution_payload.clone(),
358                        Vec::<B256>::new(),
359                        B256::ZERO,
360                        envelope.execution_requests.to_vec(),
361                    ))?,
362                )
363            };
364
365            let server_timings =
366                call_new_payload_with_reth(&auth_provider, version, params).await?;
367
368            let np_latency =
369                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
370            let new_payload_result = NewPayloadResult {
371                gas_used,
372                latency: np_latency,
373                persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
374                execution_cache_wait: server_timings
375                    .as_ref()
376                    .map(|t| t.execution_cache_wait)
377                    .unwrap_or_default(),
378                sparse_trie_wait: server_timings
379                    .as_ref()
380                    .map(|t| t.sparse_trie_wait)
381                    .unwrap_or_default(),
382            };
383
384            let fcu_state = ForkchoiceState {
385                head_block_hash: block_hash,
386                safe_block_hash: parent_hash,
387                finalized_block_hash: parent_hash,
388            };
389
390            let fcu_start = Instant::now();
391            call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
392            let fcu_latency = fcu_start.elapsed();
393
394            let total_latency =
395                if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
396
397            let combined_result = CombinedResult {
398                block_number,
399                gas_limit,
400                transaction_count,
401                new_payload_result,
402                fcu_latency,
403                total_latency,
404            };
405
406            let current_duration = total_benchmark_duration.elapsed();
407            let progress = format!("{}/{}", i + 1, payloads.len());
408            info!(target: "reth-bench", progress, %combined_result);
409
410            if let Some(scraper) = metrics_scraper.as_mut() &&
411                let Err(err) = scraper.scrape_after_block(block_number).await
412            {
413                tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
414            }
415
416            if let Some(w) = &mut waiter {
417                w.on_block(block_number).await?;
418            }
419
420            let gas_row =
421                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
422            results.push((gas_row, combined_result));
423
424            parent_hash = block_hash;
425        }
426
427        // Drop waiter - we don't need to wait for final blocks to persist
428        // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
429        drop(waiter);
430
431        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
432            results.into_iter().unzip();
433
434        if let Some(ref path) = self.output {
435            write_benchmark_results(path, &gas_output_results, &combined_results)?;
436        }
437
438        if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
439            scraper.write_csv(path)?;
440        }
441
442        let gas_output =
443            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
444        info!(
445            target: "reth-bench",
446            total_gas_used = gas_output.total_gas_used,
447            total_duration = ?gas_output.total_duration,
448            execution_duration = ?gas_output.execution_duration,
449            blocks_processed = gas_output.blocks_processed,
450            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
451            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
452            "Benchmark complete"
453        );
454
455        Ok(())
456    }
457
458    /// Load and parse all payload files from the directory.
459    fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
460        let mut payloads = Vec::new();
461
462        // Read directory entries
463        let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
464            .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
465            .filter_map(|e| e.ok())
466            .filter(|e| {
467                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
468                    e.file_name().to_string_lossy().starts_with("payload_block_")
469            })
470            .collect();
471
472        // Parse filenames to get indices and sort
473        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
474            .into_iter()
475            .filter_map(|e| {
476                let name = e.file_name();
477                let name_str = name.to_string_lossy();
478                // Extract index from "payload_NNN.json"
479                let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
480                let index: u64 = index_str.parse().ok()?;
481                Some((index, e.path()))
482            })
483            .collect();
484
485        indexed_paths.sort_by_key(|(idx, _)| *idx);
486
487        // Apply skip and count
488        let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
489        let indexed_paths: Vec<_> = match self.count {
490            Some(count) => indexed_paths.into_iter().take(count).collect(),
491            None => indexed_paths,
492        };
493
494        // Load each payload
495        for (index, path) in indexed_paths {
496            let content = std::fs::read_to_string(&path)
497                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
498            let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
499                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
500
501            let block_hash =
502                envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
503
504            debug!(
505                target: "reth-bench",
506                index = index,
507                block_hash = %block_hash,
508                path = %path.display(),
509                "Loaded payload"
510            );
511
512            payloads.push(LoadedPayload { index, envelope, block_hash });
513        }
514
515        Ok(payloads)
516    }
517
518    /// Load and parse gas ramp payload files from a directory.
519    fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
520        let mut payloads = Vec::new();
521
522        let entries: Vec<_> = std::fs::read_dir(dir)
523            .wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
524            .filter_map(|e| e.ok())
525            .filter(|e| {
526                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
527                    e.file_name().to_string_lossy().starts_with("payload_block_")
528            })
529            .collect();
530
531        // Parse filenames to get block numbers and sort
532        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
533            .into_iter()
534            .filter_map(|e| {
535                let name = e.file_name();
536                let name_str = name.to_string_lossy();
537                // Extract block number from "payload_block_NNN.json"
538                let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
539                let block_number: u64 = block_str.parse().ok()?;
540                Some((block_number, e.path()))
541            })
542            .collect();
543
544        indexed_paths.sort_by_key(|(num, _)| *num);
545
546        for (block_number, path) in indexed_paths {
547            let content = std::fs::read_to_string(&path)
548                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
549            let file: GasRampPayloadFile = serde_json::from_str(&content)
550                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
551
552            let version = if let Some(version) = file.version {
553                match version {
554                    1 => EngineApiMessageVersion::V1,
555                    2 => EngineApiMessageVersion::V2,
556                    3 => EngineApiMessageVersion::V3,
557                    4 => EngineApiMessageVersion::V4,
558                    5 => EngineApiMessageVersion::V5,
559                    v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
560                }
561                .into()
562            } else {
563                None
564            };
565
566            info!(
567                block_number,
568                block_hash = %file.block_hash,
569                path = %path.display(),
570                "Loaded gas ramp payload"
571            );
572
573            payloads.push(GasRampPayload { block_number, version, file });
574        }
575
576        Ok(payloads)
577    }
578}