Skip to main content

reth_bench/bench/
replay_payloads.rs

1//! Command for replaying pre-generated payloads from disk.
2//!
3//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
4//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
5//!
6//! Supports configurable waiting behavior:
7//! - **`--wait-time`**: Fixed sleep interval between blocks.
8//! - **`--wait-for-persistence`**: Waits for every Nth block to be persisted using the
9//!   `reth_subscribePersistedBlock` subscription, where N matches the engine's persistence
10//!   threshold. This ensures the benchmark doesn't outpace persistence.
11//!
12//! Both options can be used together or independently.
13
14use crate::{
15    authenticated_transport::AuthenticatedTransportConnect,
16    bench::{
17        helpers::parse_duration,
18        output::{
19            write_benchmark_results, CombinedResult, GasRampPayloadFile, NewPayloadResult,
20            TotalGasOutput, TotalGasRow,
21        },
22        persistence_waiter::{
23            derive_ws_rpc_url, setup_persistence_subscription, PersistenceWaiter,
24        },
25    },
26    valid_payload::{call_forkchoice_updated, call_new_payload},
27};
28use alloy_primitives::B256;
29use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
30use alloy_rpc_client::ClientBuilder;
31use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
32use clap::Parser;
33use eyre::Context;
34use reth_cli_runner::CliContext;
35use reth_engine_primitives::config::DEFAULT_PERSISTENCE_THRESHOLD;
36use reth_node_api::EngineApiMessageVersion;
37use std::{
38    path::PathBuf,
39    time::{Duration, Instant},
40};
41use tracing::{debug, info};
42use url::Url;
43
44/// `reth bench replay-payloads` command
45///
46/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
47/// `forkchoiceUpdated` for each payload in sequence.
48#[derive(Debug, Parser)]
49pub struct Command {
50    /// The engine RPC URL (with JWT authentication).
51    #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
52    engine_rpc_url: String,
53
54    /// Path to the JWT secret file for engine API authentication.
55    #[arg(long, value_name = "JWT_SECRET")]
56    jwt_secret: PathBuf,
57
58    /// Directory containing payload files (`payload_block_N.json`).
59    #[arg(long, value_name = "PAYLOAD_DIR")]
60    payload_dir: PathBuf,
61
62    /// Optional limit on the number of payloads to replay.
63    /// If not specified, replays all payloads in the directory.
64    #[arg(long, value_name = "COUNT")]
65    count: Option<usize>,
66
67    /// Skip the first N payloads.
68    #[arg(long, value_name = "SKIP", default_value = "0")]
69    skip: usize,
70
71    /// Optional directory containing gas ramp payloads to replay first.
72    /// These are replayed before the main payloads to warm up the gas limit.
73    #[arg(long, value_name = "GAS_RAMP_DIR")]
74    gas_ramp_dir: Option<PathBuf>,
75
76    /// Optional output directory for benchmark results (CSV files).
77    #[arg(long, value_name = "OUTPUT")]
78    output: Option<PathBuf>,
79
80    /// How long to wait after a forkchoice update before sending the next payload.
81    ///
82    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
83    /// milliseconds (e.g. `400`).
84    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
85    wait_time: Option<Duration>,
86
87    /// Wait for blocks to be persisted before sending the next batch.
88    ///
89    /// When enabled, waits for every Nth block to be persisted using the
90    /// `reth_subscribePersistedBlock` subscription. This ensures the benchmark
91    /// doesn't outpace persistence.
92    ///
93    /// The subscription uses the regular RPC websocket endpoint (no JWT required).
94    #[arg(long, default_value = "false", verbatim_doc_comment)]
95    wait_for_persistence: bool,
96
97    /// Engine persistence threshold used for deciding when to wait for persistence.
98    ///
99    /// The benchmark waits after every `(threshold + 1)` blocks. By default this
100    /// matches the engine's `DEFAULT_PERSISTENCE_THRESHOLD` (2), so waits occur
101    /// at blocks 3, 6, 9, etc.
102    #[arg(
103        long = "persistence-threshold",
104        value_name = "PERSISTENCE_THRESHOLD",
105        default_value_t = DEFAULT_PERSISTENCE_THRESHOLD,
106        verbatim_doc_comment
107    )]
108    persistence_threshold: u64,
109
110    /// Timeout for waiting on persistence at each checkpoint.
111    ///
112    /// Must be long enough to account for the persistence thread being blocked
113    /// by pruning after the previous save.
114    #[arg(
115        long = "persistence-timeout",
116        value_name = "PERSISTENCE_TIMEOUT",
117        value_parser = parse_duration,
118        default_value = "120s",
119        verbatim_doc_comment
120    )]
121    persistence_timeout: Duration,
122
123    /// Optional `WebSocket` RPC URL for persistence subscription.
124    /// If not provided, derives from engine RPC URL by changing scheme to ws and port to 8546.
125    #[arg(long, value_name = "WS_RPC_URL", verbatim_doc_comment)]
126    ws_rpc_url: Option<String>,
127}
128
129/// A loaded payload ready for execution.
130struct LoadedPayload {
131    /// The index (from filename).
132    index: u64,
133    /// The payload envelope.
134    envelope: ExecutionPayloadEnvelopeV4,
135    /// The block hash.
136    block_hash: B256,
137}
138
139/// A gas ramp payload loaded from disk.
140struct GasRampPayload {
141    /// Block number from filename.
142    block_number: u64,
143    /// Engine API version for newPayload.
144    version: EngineApiMessageVersion,
145    /// The file contents.
146    file: GasRampPayloadFile,
147}
148
149impl Command {
150    /// Execute the `replay-payloads` command.
151    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
152        info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
153
154        // Log mode configuration
155        if let Some(duration) = self.wait_time {
156            info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
157        }
158        if self.wait_for_persistence {
159            info!(
160                target: "reth-bench",
161                "Persistence waiting enabled (waits after every {} blocks to match engine gap > {} behavior)",
162                self.persistence_threshold + 1,
163                self.persistence_threshold
164            );
165        }
166
167        // Set up waiter based on configured options
168        // When both are set: wait at least wait_time, and also wait for persistence if needed
169        let mut waiter = match (self.wait_time, self.wait_for_persistence) {
170            (Some(duration), true) => {
171                let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
172                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
173                Some(PersistenceWaiter::with_duration_and_subscription(
174                    duration,
175                    sub,
176                    self.persistence_threshold,
177                    self.persistence_timeout,
178                ))
179            }
180            (Some(duration), false) => Some(PersistenceWaiter::with_duration(duration)),
181            (None, true) => {
182                let ws_url = derive_ws_rpc_url(self.ws_rpc_url.as_deref(), &self.engine_rpc_url)?;
183                let sub = setup_persistence_subscription(ws_url, self.persistence_timeout).await?;
184                Some(PersistenceWaiter::with_subscription(
185                    sub,
186                    self.persistence_threshold,
187                    self.persistence_timeout,
188                ))
189            }
190            (None, false) => None,
191        };
192
193        // Set up authenticated engine provider
194        let jwt =
195            std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
196        let jwt = JwtSecret::from_hex(jwt.trim())?;
197        let auth_url = Url::parse(&self.engine_rpc_url)?;
198
199        info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
200        let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
201        let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
202        let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
203
204        // Get parent block (latest canonical block) - we need this for the first FCU
205        let parent_block = auth_provider
206            .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
207            .await?
208            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
209
210        let initial_parent_hash = parent_block.header.hash;
211        let initial_parent_number = parent_block.header.number;
212
213        info!(
214            target: "reth-bench",
215            parent_hash = %initial_parent_hash,
216            parent_number = initial_parent_number,
217            "Using initial parent block"
218        );
219
220        // Load all payloads upfront to avoid I/O delays between phases
221        let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
222            let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
223            if payloads.is_empty() {
224                return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
225            }
226            info!(target: "reth-bench", count = payloads.len(), "Loaded gas ramp payloads from disk");
227            payloads
228        } else {
229            Vec::new()
230        };
231
232        let payloads = self.load_payloads()?;
233        if payloads.is_empty() {
234            return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
235        }
236        info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
237
238        let mut parent_hash = initial_parent_hash;
239
240        // Replay gas ramp payloads first
241        for (i, payload) in gas_ramp_payloads.iter().enumerate() {
242            info!(
243                target: "reth-bench",
244                gas_ramp_payload = i + 1,
245                total = gas_ramp_payloads.len(),
246                block_number = payload.block_number,
247                block_hash = %payload.file.block_hash,
248                "Executing gas ramp payload (newPayload + FCU)"
249            );
250
251            call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
252
253            let fcu_state = ForkchoiceState {
254                head_block_hash: payload.file.block_hash,
255                safe_block_hash: parent_hash,
256                finalized_block_hash: parent_hash,
257            };
258            call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
259
260            info!(target: "reth-bench", gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
261
262            if let Some(w) = &mut waiter {
263                w.on_block(payload.block_number).await?;
264            }
265
266            parent_hash = payload.file.block_hash;
267        }
268
269        if !gas_ramp_payloads.is_empty() {
270            info!(target: "reth-bench", count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
271        }
272
273        let mut results = Vec::new();
274        let total_benchmark_duration = Instant::now();
275
276        for (i, payload) in payloads.iter().enumerate() {
277            let envelope = &payload.envelope;
278            let block_hash = payload.block_hash;
279            let execution_payload = &envelope.envelope_inner.execution_payload;
280            let inner_payload = &execution_payload.payload_inner.payload_inner;
281
282            let gas_used = inner_payload.gas_used;
283            let gas_limit = inner_payload.gas_limit;
284            let block_number = inner_payload.block_number;
285            let transaction_count =
286                execution_payload.payload_inner.payload_inner.transactions.len() as u64;
287
288            debug!(
289                target: "reth-bench",
290                payload = i + 1,
291                total = payloads.len(),
292                index = payload.index,
293                block_hash = %block_hash,
294                "Executing payload (newPayload + FCU)"
295            );
296
297            let start = Instant::now();
298
299            debug!(
300                target: "reth-bench",
301                method = "engine_newPayloadV4",
302                block_hash = %block_hash,
303                "Sending newPayload"
304            );
305
306            let status = auth_provider
307                .new_payload_v4(
308                    execution_payload.clone(),
309                    vec![],
310                    B256::ZERO,
311                    envelope.execution_requests.to_vec(),
312                )
313                .await?;
314
315            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
316
317            if !status.is_valid() {
318                return Err(eyre::eyre!("Payload rejected: {:?}", status));
319            }
320
321            let fcu_state = ForkchoiceState {
322                head_block_hash: block_hash,
323                safe_block_hash: parent_hash,
324                finalized_block_hash: parent_hash,
325            };
326
327            debug!(target: "reth-bench", method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
328
329            let fcu_result = auth_provider.fork_choice_updated_v3(fcu_state, None).await?;
330
331            let total_latency = start.elapsed();
332            let fcu_latency = total_latency - new_payload_result.latency;
333
334            let combined_result = CombinedResult {
335                block_number,
336                gas_limit,
337                transaction_count,
338                new_payload_result,
339                fcu_latency,
340                total_latency,
341            };
342
343            let current_duration = total_benchmark_duration.elapsed();
344            let progress = format!("{}/{}", i + 1, payloads.len());
345            info!(target: "reth-bench", progress, %combined_result);
346
347            if let Some(w) = &mut waiter {
348                w.on_block(block_number).await?;
349            }
350
351            let gas_row =
352                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
353            results.push((gas_row, combined_result));
354
355            debug!(target: "reth-bench", ?status, ?fcu_result, "Payload executed successfully");
356            parent_hash = block_hash;
357        }
358
359        // Drop waiter - we don't need to wait for final blocks to persist
360        // since the benchmark goal is measuring Ggas/s of newPayload/FCU, not persistence.
361        drop(waiter);
362
363        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
364            results.into_iter().unzip();
365
366        if let Some(ref path) = self.output {
367            write_benchmark_results(path, &gas_output_results, &combined_results)?;
368        }
369
370        let gas_output =
371            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
372        info!(
373            target: "reth-bench",
374            total_gas_used = gas_output.total_gas_used,
375            total_duration = ?gas_output.total_duration,
376            execution_duration = ?gas_output.execution_duration,
377            blocks_processed = gas_output.blocks_processed,
378            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
379            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
380            "Benchmark complete"
381        );
382
383        Ok(())
384    }
385
386    /// Load and parse all payload files from the directory.
387    fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
388        let mut payloads = Vec::new();
389
390        // Read directory entries
391        let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
392            .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
393            .filter_map(|e| e.ok())
394            .filter(|e| {
395                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
396                    e.file_name().to_string_lossy().starts_with("payload_block_")
397            })
398            .collect();
399
400        // Parse filenames to get indices and sort
401        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
402            .into_iter()
403            .filter_map(|e| {
404                let name = e.file_name();
405                let name_str = name.to_string_lossy();
406                // Extract index from "payload_NNN.json"
407                let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
408                let index: u64 = index_str.parse().ok()?;
409                Some((index, e.path()))
410            })
411            .collect();
412
413        indexed_paths.sort_by_key(|(idx, _)| *idx);
414
415        // Apply skip and count
416        let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
417        let indexed_paths: Vec<_> = match self.count {
418            Some(count) => indexed_paths.into_iter().take(count).collect(),
419            None => indexed_paths,
420        };
421
422        // Load each payload
423        for (index, path) in indexed_paths {
424            let content = std::fs::read_to_string(&path)
425                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
426            let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
427                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
428
429            let block_hash =
430                envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
431
432            debug!(
433                target: "reth-bench",
434                index = index,
435                block_hash = %block_hash,
436                path = %path.display(),
437                "Loaded payload"
438            );
439
440            payloads.push(LoadedPayload { index, envelope, block_hash });
441        }
442
443        Ok(payloads)
444    }
445
446    /// Load and parse gas ramp payload files from a directory.
447    fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
448        let mut payloads = Vec::new();
449
450        let entries: Vec<_> = std::fs::read_dir(dir)
451            .wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
452            .filter_map(|e| e.ok())
453            .filter(|e| {
454                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
455                    e.file_name().to_string_lossy().starts_with("payload_block_")
456            })
457            .collect();
458
459        // Parse filenames to get block numbers and sort
460        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
461            .into_iter()
462            .filter_map(|e| {
463                let name = e.file_name();
464                let name_str = name.to_string_lossy();
465                // Extract block number from "payload_block_NNN.json"
466                let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
467                let block_number: u64 = block_str.parse().ok()?;
468                Some((block_number, e.path()))
469            })
470            .collect();
471
472        indexed_paths.sort_by_key(|(num, _)| *num);
473
474        for (block_number, path) in indexed_paths {
475            let content = std::fs::read_to_string(&path)
476                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
477            let file: GasRampPayloadFile = serde_json::from_str(&content)
478                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
479
480            let version = match file.version {
481                1 => EngineApiMessageVersion::V1,
482                2 => EngineApiMessageVersion::V2,
483                3 => EngineApiMessageVersion::V3,
484                4 => EngineApiMessageVersion::V4,
485                5 => EngineApiMessageVersion::V5,
486                v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
487            };
488
489            info!(
490                block_number,
491                block_hash = %file.block_hash,
492                path = %path.display(),
493                "Loaded gas ramp payload"
494            );
495
496            payloads.push(GasRampPayload { block_number, version, file });
497        }
498
499        Ok(payloads)
500    }
501}