Skip to main content

reth_bench/bench/
replay_payloads.rs

1//! Command for replaying pre-generated payloads from disk.
2
3use crate::{
4    authenticated_transport::AuthenticatedTransportConnect,
5    bench::{
6        helpers::parse_duration,
7        metrics_scraper::MetricsScraper,
8        output::{
9            write_benchmark_results, CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow,
10        },
11    },
12    valid_payload::{call_forkchoice_updated_with_reth, call_new_payload_with_reth},
13};
14use alloy_primitives::B256;
15use alloy_provider::{network::AnyNetwork, Provider, RootProvider};
16use alloy_rpc_client::ClientBuilder;
17use alloy_rpc_types_engine::{
18    CancunPayloadFields, ExecutionData, ExecutionPayloadEnvelopeV4, ExecutionPayloadSidecar,
19    ForkchoiceState, JwtSecret, PraguePayloadFields,
20};
21use clap::Parser;
22use eyre::Context;
23use reth_cli_runner::CliContext;
24use reth_node_api::EngineApiMessageVersion;
25use reth_node_core::args::WaitForPersistence;
26use reth_rpc_api::RethNewPayloadInput;
27use std::{
28    path::PathBuf,
29    time::{Duration, Instant},
30};
31use tracing::{debug, info, warn};
32use url::Url;
33
34/// `reth bench replay-payloads` command
35///
36/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
37/// `forkchoiceUpdated` for each payload in sequence.
38#[derive(Debug, Parser)]
39pub struct Command {
40    /// The engine RPC URL (with JWT authentication).
41    #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
42    engine_rpc_url: String,
43
44    /// Path to the JWT secret file for engine API authentication.
45    #[arg(long, value_name = "JWT_SECRET")]
46    jwt_secret: PathBuf,
47
48    /// Directory containing payload files (`payload_block_N.json`).
49    #[arg(long, value_name = "PAYLOAD_DIR")]
50    payload_dir: PathBuf,
51
52    /// Optional limit on the number of payloads to replay.
53    /// If not specified, replays all payloads in the directory.
54    #[arg(long, value_name = "COUNT")]
55    count: Option<usize>,
56
57    /// Skip the first N payloads.
58    #[arg(long, value_name = "SKIP", default_value = "0")]
59    skip: usize,
60
61    /// Deprecated: gas ramp is no longer needed. Use `--testing.skip-gas-limit-ramp-check`
62    /// and `--testing.gas-limit` on the reth node instead. This flag is accepted but ignored.
63    #[arg(long, value_name = "GAS_RAMP_DIR", hide = true)]
64    gas_ramp_dir: Option<PathBuf>,
65
66    /// Optional output directory for benchmark results (CSV files).
67    #[arg(long, value_name = "OUTPUT")]
68    output: Option<PathBuf>,
69
70    /// How long to wait after a forkchoice update before sending the next payload.
71    ///
72    /// Accepts a duration string (e.g. `100ms`, `2s`) or a bare integer treated as
73    /// milliseconds (e.g. `400`).
74    #[arg(long, value_name = "WAIT_TIME", value_parser = parse_duration, verbatim_doc_comment)]
75    wait_time: Option<Duration>,
76
77    /// Use `reth_newPayload` endpoint instead of `engine_newPayload*`.
78    ///
79    /// The `reth_newPayload` endpoint is a reth-specific extension that takes `ExecutionData`
80    /// directly, waits for persistence and cache updates to complete before processing,
81    /// and returns server-side timing breakdowns (latency, persistence wait, cache wait).
82    #[arg(long, default_value = "false", verbatim_doc_comment)]
83    reth_new_payload: bool,
84
85    /// Control when `reth_newPayload` waits for in-flight persistence.
86    ///
87    /// Accepts `always` (default — wait on every block), `never`, or a number N
88    /// to wait every N blocks and skip the rest.
89    ///
90    /// Requires `--reth-new-payload`.
91    #[arg(
92        long = "wait-for-persistence",
93        value_name = "MODE",
94        num_args = 0..=1,
95        default_missing_value = "always",
96        value_parser = clap::value_parser!(WaitForPersistence),
97        requires = "reth_new_payload",
98        verbatim_doc_comment
99    )]
100    wait_for_persistence: Option<WaitForPersistence>,
101
102    /// Skip waiting for execution cache and sparse trie locks before processing.
103    ///
104    /// Only works with `--reth-new-payload`. When set, passes `wait_for_caches: false`
105    /// to the `reth_newPayload` endpoint.
106    #[arg(long, default_value = "false", verbatim_doc_comment, requires = "reth_new_payload")]
107    no_wait_for_caches: bool,
108
109    /// Optional Prometheus metrics endpoint to scrape after each block.
110    ///
111    /// When provided, reth-bench will fetch metrics from this URL after each
112    /// payload, recording per-block execution and state root durations.
113    /// Results are written to `metrics.csv` in the output directory.
114    #[arg(long = "metrics-url", value_name = "URL", verbatim_doc_comment)]
115    metrics_url: Option<String>,
116}
117
118/// A loaded payload ready for execution.
119struct LoadedPayload {
120    /// The index (from filename).
121    index: u64,
122    /// The payload envelope.
123    envelope: ExecutionPayloadEnvelopeV4,
124    /// The block hash.
125    block_hash: B256,
126}
127
128impl Command {
129    /// Execute the `replay-payloads` command.
130    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
131        info!(target: "reth-bench", payload_dir = %self.payload_dir.display(), "Replaying payloads");
132
133        // Log mode configuration
134        if let Some(duration) = self.wait_time {
135            info!(target: "reth-bench", "Using wait-time mode with {}ms delay between blocks", duration.as_millis());
136        }
137        if self.reth_new_payload {
138            info!("Using reth_newPayload and reth_forkchoiceUpdated endpoints");
139        }
140
141        let mut metrics_scraper = MetricsScraper::maybe_new(self.metrics_url.clone());
142
143        // Set up authenticated engine provider
144        let jwt =
145            std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
146        let jwt = JwtSecret::from_hex(jwt.trim())?;
147        let auth_url = Url::parse(&self.engine_rpc_url)?;
148
149        info!(target: "reth-bench", "Connecting to Engine RPC at {}", auth_url);
150        let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
151        let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
152        let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
153
154        // Get parent block (latest canonical block) - we need this for the first FCU
155        let parent_block = auth_provider
156            .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
157            .await?
158            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
159
160        let initial_parent_hash = parent_block.header.hash;
161        let initial_parent_number = parent_block.header.number;
162
163        info!(
164            target: "reth-bench",
165            parent_hash = %initial_parent_hash,
166            parent_number = initial_parent_number,
167            "Using initial parent block"
168        );
169
170        // Warn if deprecated --gas-ramp-dir is passed
171        if self.gas_ramp_dir.is_some() {
172            warn!(
173                target: "reth-bench",
174                "--gas-ramp-dir is deprecated and ignored. Use --testing.skip-gas-limit-ramp-check \
175                 and --testing.gas-limit on the reth node instead."
176            );
177        }
178
179        // Load all payloads upfront to avoid I/O delays between phases
180        let payloads = self.load_payloads()?;
181        if payloads.is_empty() {
182            return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
183        }
184        info!(target: "reth-bench", count = payloads.len(), "Loaded main payloads from disk");
185
186        let mut parent_hash = initial_parent_hash;
187
188        let mut results = Vec::new();
189        let total_benchmark_duration = Instant::now();
190
191        for (i, payload) in payloads.iter().enumerate() {
192            let envelope = &payload.envelope;
193            let block_hash = payload.block_hash;
194            let execution_payload = &envelope.envelope_inner.execution_payload;
195            let inner_payload = &execution_payload.payload_inner.payload_inner;
196
197            let gas_used = inner_payload.gas_used;
198            let gas_limit = inner_payload.gas_limit;
199            let block_number = inner_payload.block_number;
200            let transaction_count =
201                execution_payload.payload_inner.payload_inner.transactions.len() as u64;
202
203            debug!(
204                target: "reth-bench",
205                payload = i + 1,
206                total = payloads.len(),
207                index = payload.index,
208                block_hash = %block_hash,
209                "Executing payload (newPayload + FCU)"
210            );
211
212            let start = Instant::now();
213
214            debug!(
215                target: "reth-bench",
216                method = "engine_newPayloadV4",
217                block_hash = %block_hash,
218                "Sending newPayload"
219            );
220
221            let use_reth = self.reth_new_payload;
222            let (version, params) = if use_reth {
223                let wait_for_persistence = self
224                    .wait_for_persistence
225                    .unwrap_or(WaitForPersistence::Never)
226                    .rpc_value(block_number);
227                let reth_data = ExecutionData {
228                    payload: execution_payload.clone().into(),
229                    sidecar: ExecutionPayloadSidecar::v4(
230                        CancunPayloadFields {
231                            versioned_hashes: Vec::new(),
232                            parent_beacon_block_root: B256::ZERO,
233                        },
234                        PraguePayloadFields {
235                            requests: envelope.execution_requests.clone().into(),
236                        },
237                    ),
238                };
239                (
240                    None,
241                    serde_json::to_value((
242                        RethNewPayloadInput::ExecutionData(reth_data),
243                        wait_for_persistence,
244                        self.no_wait_for_caches.then_some(false),
245                    ))?,
246                )
247            } else {
248                (
249                    Some(EngineApiMessageVersion::V4),
250                    serde_json::to_value((
251                        execution_payload.clone(),
252                        Vec::<B256>::new(),
253                        B256::ZERO,
254                        envelope.execution_requests.to_vec(),
255                    ))?,
256                )
257            };
258
259            let server_timings =
260                call_new_payload_with_reth(&auth_provider, version, params).await?;
261
262            let np_latency =
263                server_timings.as_ref().map(|t| t.latency).unwrap_or_else(|| start.elapsed());
264            let new_payload_result = NewPayloadResult {
265                gas_used,
266                latency: np_latency,
267                persistence_wait: server_timings.as_ref().and_then(|t| t.persistence_wait),
268                execution_cache_wait: server_timings
269                    .as_ref()
270                    .map(|t| t.execution_cache_wait)
271                    .unwrap_or_default(),
272                sparse_trie_wait: server_timings
273                    .as_ref()
274                    .map(|t| t.sparse_trie_wait)
275                    .unwrap_or_default(),
276            };
277
278            let fcu_state = ForkchoiceState {
279                head_block_hash: block_hash,
280                safe_block_hash: parent_hash,
281                finalized_block_hash: parent_hash,
282            };
283
284            let fcu_start = Instant::now();
285            call_forkchoice_updated_with_reth(&auth_provider, version, fcu_state).await?;
286            let fcu_latency = fcu_start.elapsed();
287
288            let total_latency =
289                if server_timings.is_some() { np_latency + fcu_latency } else { start.elapsed() };
290
291            let combined_result = CombinedResult {
292                block_number,
293                gas_limit,
294                transaction_count,
295                new_payload_result,
296                fcu_latency,
297                total_latency,
298            };
299
300            let current_duration = total_benchmark_duration.elapsed();
301            let progress = format!("{}/{}", i + 1, payloads.len());
302            info!(target: "reth-bench", progress, %combined_result);
303
304            if let Some(scraper) = metrics_scraper.as_mut() &&
305                let Err(err) = scraper.scrape_after_block(block_number).await
306            {
307                tracing::warn!(target: "reth-bench", %err, block_number, "Failed to scrape metrics");
308            }
309
310            let gas_row =
311                TotalGasRow { block_number, transaction_count, gas_used, time: current_duration };
312            results.push((gas_row, combined_result));
313
314            parent_hash = block_hash;
315        }
316
317        let (gas_output_results, combined_results): (Vec<TotalGasRow>, Vec<CombinedResult>) =
318            results.into_iter().unzip();
319
320        if let Some(ref path) = self.output {
321            write_benchmark_results(path, &gas_output_results, &combined_results)?;
322        }
323
324        if let (Some(path), Some(scraper)) = (&self.output, &metrics_scraper) {
325            scraper.write_csv(path)?;
326        }
327
328        let gas_output =
329            TotalGasOutput::with_combined_results(gas_output_results, &combined_results)?;
330        info!(
331            target: "reth-bench",
332            total_gas_used = gas_output.total_gas_used,
333            total_duration = ?gas_output.total_duration,
334            execution_duration = ?gas_output.execution_duration,
335            blocks_processed = gas_output.blocks_processed,
336            wall_clock_ggas_per_second = format_args!("{:.4}", gas_output.total_gigagas_per_second()),
337            execution_ggas_per_second = format_args!("{:.4}", gas_output.execution_gigagas_per_second()),
338            "Benchmark complete"
339        );
340
341        Ok(())
342    }
343
344    /// Load and parse all payload files from the directory.
345    fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
346        let mut payloads = Vec::new();
347
348        // Read directory entries
349        let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
350            .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
351            .filter_map(|e| e.ok())
352            .filter(|e| {
353                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
354                    e.file_name().to_string_lossy().starts_with("payload_block_")
355            })
356            .collect();
357
358        // Parse filenames to get indices and sort
359        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
360            .into_iter()
361            .filter_map(|e| {
362                let name = e.file_name();
363                let name_str = name.to_string_lossy();
364                // Extract index from "payload_NNN.json"
365                let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
366                let index: u64 = index_str.parse().ok()?;
367                Some((index, e.path()))
368            })
369            .collect();
370
371        indexed_paths.sort_by_key(|(idx, _)| *idx);
372
373        // Apply skip and count
374        let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
375        let indexed_paths: Vec<_> = match self.count {
376            Some(count) => indexed_paths.into_iter().take(count).collect(),
377            None => indexed_paths,
378        };
379
380        // Load each payload
381        for (index, path) in indexed_paths {
382            let content = std::fs::read_to_string(&path)
383                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
384            let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
385                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
386
387            let block_hash =
388                envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
389
390            debug!(
391                target: "reth-bench",
392                index = index,
393                block_hash = %block_hash,
394                path = %path.display(),
395                "Loaded payload"
396            );
397
398            payloads.push(LoadedPayload { index, envelope, block_hash });
399        }
400
401        Ok(payloads)
402    }
403}