reth_bench/bench/
replay_payloads.rs

1//! Command for replaying pre-generated payloads from disk.
2//!
3//! This command reads `ExecutionPayloadEnvelopeV4` files from a directory and replays them
4//! in sequence using `newPayload` followed by `forkchoiceUpdated`.
5
6use crate::{
7    authenticated_transport::AuthenticatedTransportConnect,
8    bench::output::GasRampPayloadFile,
9    valid_payload::{call_forkchoice_updated, call_new_payload},
10};
11use alloy_primitives::B256;
12use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
13use alloy_rpc_client::ClientBuilder;
14use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
15use clap::Parser;
16use eyre::Context;
17use reqwest::Url;
18use reth_cli_runner::CliContext;
19use reth_node_api::EngineApiMessageVersion;
20use std::path::PathBuf;
21use tracing::{debug, info};
22
23/// `reth bench replay-payloads` command
24///
25/// Replays pre-generated payloads from a directory by calling `newPayload` followed by
26/// `forkchoiceUpdated` for each payload in sequence.
27#[derive(Debug, Parser)]
28pub struct Command {
29    /// The engine RPC URL (with JWT authentication).
30    #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
31    engine_rpc_url: String,
32
33    /// Path to the JWT secret file for engine API authentication.
34    #[arg(long, value_name = "JWT_SECRET")]
35    jwt_secret: PathBuf,
36
37    /// Directory containing payload files (`payload_block_N.json`).
38    #[arg(long, value_name = "PAYLOAD_DIR")]
39    payload_dir: PathBuf,
40
41    /// Optional limit on the number of payloads to replay.
42    /// If not specified, replays all payloads in the directory.
43    #[arg(long, value_name = "COUNT")]
44    count: Option<usize>,
45
46    /// Skip the first N payloads.
47    #[arg(long, value_name = "SKIP", default_value = "0")]
48    skip: usize,
49
50    /// Optional directory containing gas ramp payloads to replay first.
51    /// These are replayed before the main payloads to warm up the gas limit.
52    #[arg(long, value_name = "GAS_RAMP_DIR")]
53    gas_ramp_dir: Option<PathBuf>,
54}
55
56/// A loaded payload ready for execution.
57struct LoadedPayload {
58    /// The index (from filename).
59    index: u64,
60    /// The payload envelope.
61    envelope: ExecutionPayloadEnvelopeV4,
62    /// The block hash.
63    block_hash: B256,
64}
65
66/// A gas ramp payload loaded from disk.
67struct GasRampPayload {
68    /// Block number from filename.
69    block_number: u64,
70    /// Engine API version for newPayload.
71    version: EngineApiMessageVersion,
72    /// The file contents.
73    file: GasRampPayloadFile,
74}
75
76impl Command {
77    /// Execute the `replay-payloads` command.
78    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
79        info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
80
81        // Set up authenticated engine provider
82        let jwt =
83            std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
84        let jwt = JwtSecret::from_hex(jwt.trim())?;
85        let auth_url = Url::parse(&self.engine_rpc_url)?;
86
87        info!("Connecting to Engine RPC at {}", auth_url);
88        let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
89        let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
90        let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
91
92        // Get parent block (latest canonical block) - we need this for the first FCU
93        let parent_block = auth_provider
94            .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
95            .await?
96            .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
97
98        let initial_parent_hash = parent_block.header.hash;
99        let initial_parent_number = parent_block.header.number;
100
101        info!(
102            parent_hash = %initial_parent_hash,
103            parent_number = initial_parent_number,
104            "Using initial parent block"
105        );
106
107        // Load all payloads upfront to avoid I/O delays between phases
108        let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
109            let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
110            if payloads.is_empty() {
111                return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
112            }
113            info!(count = payloads.len(), "Loaded gas ramp payloads from disk");
114            payloads
115        } else {
116            Vec::new()
117        };
118
119        let payloads = self.load_payloads()?;
120        if payloads.is_empty() {
121            return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
122        }
123        info!(count = payloads.len(), "Loaded main payloads from disk");
124
125        let mut parent_hash = initial_parent_hash;
126
127        // Replay gas ramp payloads first
128        for (i, payload) in gas_ramp_payloads.iter().enumerate() {
129            info!(
130                gas_ramp_payload = i + 1,
131                total = gas_ramp_payloads.len(),
132                block_number = payload.block_number,
133                block_hash = %payload.file.block_hash,
134                "Executing gas ramp payload (newPayload + FCU)"
135            );
136
137            call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
138
139            let fcu_state = ForkchoiceState {
140                head_block_hash: payload.file.block_hash,
141                safe_block_hash: parent_hash,
142                finalized_block_hash: parent_hash,
143            };
144            call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
145
146            info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
147            parent_hash = payload.file.block_hash;
148        }
149
150        if !gas_ramp_payloads.is_empty() {
151            info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
152        }
153
154        for (i, payload) in payloads.iter().enumerate() {
155            info!(
156                payload = i + 1,
157                total = payloads.len(),
158                index = payload.index,
159                block_hash = %payload.block_hash,
160                "Executing payload (newPayload + FCU)"
161            );
162
163            self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
164
165            info!(payload = i + 1, "Payload executed successfully");
166            parent_hash = payload.block_hash;
167        }
168
169        info!(count = payloads.len(), "All payloads replayed successfully");
170        Ok(())
171    }
172
173    /// Load and parse all payload files from the directory.
174    fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
175        let mut payloads = Vec::new();
176
177        // Read directory entries
178        let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
179            .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
180            .filter_map(|e| e.ok())
181            .filter(|e| {
182                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
183                    e.file_name().to_string_lossy().starts_with("payload_block_")
184            })
185            .collect();
186
187        // Parse filenames to get indices and sort
188        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
189            .into_iter()
190            .filter_map(|e| {
191                let name = e.file_name();
192                let name_str = name.to_string_lossy();
193                // Extract index from "payload_NNN.json"
194                let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
195                let index: u64 = index_str.parse().ok()?;
196                Some((index, e.path()))
197            })
198            .collect();
199
200        indexed_paths.sort_by_key(|(idx, _)| *idx);
201
202        // Apply skip and count
203        let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
204        let indexed_paths: Vec<_> = match self.count {
205            Some(count) => indexed_paths.into_iter().take(count).collect(),
206            None => indexed_paths,
207        };
208
209        // Load each payload
210        for (index, path) in indexed_paths {
211            let content = std::fs::read_to_string(&path)
212                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
213            let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
214                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
215
216            let block_hash =
217                envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
218
219            info!(
220                index = index,
221                block_hash = %block_hash,
222                path = %path.display(),
223                "Loaded payload"
224            );
225
226            payloads.push(LoadedPayload { index, envelope, block_hash });
227        }
228
229        Ok(payloads)
230    }
231
232    /// Load and parse gas ramp payload files from a directory.
233    fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
234        let mut payloads = Vec::new();
235
236        let entries: Vec<_> = std::fs::read_dir(dir)
237            .wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
238            .filter_map(|e| e.ok())
239            .filter(|e| {
240                e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
241                    e.file_name().to_string_lossy().starts_with("payload_block_")
242            })
243            .collect();
244
245        // Parse filenames to get block numbers and sort
246        let mut indexed_paths: Vec<(u64, PathBuf)> = entries
247            .into_iter()
248            .filter_map(|e| {
249                let name = e.file_name();
250                let name_str = name.to_string_lossy();
251                // Extract block number from "payload_block_NNN.json"
252                let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
253                let block_number: u64 = block_str.parse().ok()?;
254                Some((block_number, e.path()))
255            })
256            .collect();
257
258        indexed_paths.sort_by_key(|(num, _)| *num);
259
260        for (block_number, path) in indexed_paths {
261            let content = std::fs::read_to_string(&path)
262                .wrap_err_with(|| format!("Failed to read {:?}", path))?;
263            let file: GasRampPayloadFile = serde_json::from_str(&content)
264                .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
265
266            let version = match file.version {
267                1 => EngineApiMessageVersion::V1,
268                2 => EngineApiMessageVersion::V2,
269                3 => EngineApiMessageVersion::V3,
270                4 => EngineApiMessageVersion::V4,
271                5 => EngineApiMessageVersion::V5,
272                v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
273            };
274
275            info!(
276                block_number,
277                block_hash = %file.block_hash,
278                path = %path.display(),
279                "Loaded gas ramp payload"
280            );
281
282            payloads.push(GasRampPayload { block_number, version, file });
283        }
284
285        Ok(payloads)
286    }
287
288    async fn execute_payload_v4(
289        &self,
290        provider: &RootProvider<AnyNetwork>,
291        envelope: &ExecutionPayloadEnvelopeV4,
292        parent_hash: B256,
293    ) -> eyre::Result<()> {
294        let block_hash =
295            envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
296
297        debug!(
298            method = "engine_newPayloadV4",
299            block_hash = %block_hash,
300            "Sending newPayload"
301        );
302
303        let status = provider
304            .new_payload_v4(
305                envelope.envelope_inner.execution_payload.clone(),
306                vec![],
307                B256::ZERO,
308                envelope.execution_requests.to_vec(),
309            )
310            .await?;
311
312        info!(?status, "newPayloadV4 response");
313
314        if !status.is_valid() {
315            return Err(eyre::eyre!("Payload rejected: {:?}", status));
316        }
317
318        let fcu_state = ForkchoiceState {
319            head_block_hash: block_hash,
320            safe_block_hash: parent_hash,
321            finalized_block_hash: parent_hash,
322        };
323
324        debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
325
326        let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
327
328        info!(?fcu_result, "forkchoiceUpdatedV3 response");
329
330        Ok(())
331    }
332}