1use crate::{
7 authenticated_transport::AuthenticatedTransportConnect,
8 bench::output::GasRampPayloadFile,
9 valid_payload::{call_forkchoice_updated, call_new_payload},
10};
11use alloy_primitives::B256;
12use alloy_provider::{ext::EngineApi, network::AnyNetwork, Provider, RootProvider};
13use alloy_rpc_client::ClientBuilder;
14use alloy_rpc_types_engine::{ExecutionPayloadEnvelopeV4, ForkchoiceState, JwtSecret};
15use clap::Parser;
16use eyre::Context;
17use reqwest::Url;
18use reth_cli_runner::CliContext;
19use reth_node_api::EngineApiMessageVersion;
20use std::path::PathBuf;
21use tracing::{debug, info};
22
23#[derive(Debug, Parser)]
28pub struct Command {
29 #[arg(long, value_name = "ENGINE_RPC_URL", default_value = "http://localhost:8551")]
31 engine_rpc_url: String,
32
33 #[arg(long, value_name = "JWT_SECRET")]
35 jwt_secret: PathBuf,
36
37 #[arg(long, value_name = "PAYLOAD_DIR")]
39 payload_dir: PathBuf,
40
41 #[arg(long, value_name = "COUNT")]
44 count: Option<usize>,
45
46 #[arg(long, value_name = "SKIP", default_value = "0")]
48 skip: usize,
49
50 #[arg(long, value_name = "GAS_RAMP_DIR")]
53 gas_ramp_dir: Option<PathBuf>,
54}
55
56struct LoadedPayload {
58 index: u64,
60 envelope: ExecutionPayloadEnvelopeV4,
62 block_hash: B256,
64}
65
66struct GasRampPayload {
68 block_number: u64,
70 version: EngineApiMessageVersion,
72 file: GasRampPayloadFile,
74}
75
76impl Command {
77 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
79 info!(payload_dir = %self.payload_dir.display(), "Replaying payloads");
80
81 let jwt =
83 std::fs::read_to_string(&self.jwt_secret).wrap_err("Failed to read JWT secret file")?;
84 let jwt = JwtSecret::from_hex(jwt.trim())?;
85 let auth_url = Url::parse(&self.engine_rpc_url)?;
86
87 info!("Connecting to Engine RPC at {}", auth_url);
88 let auth_transport = AuthenticatedTransportConnect::new(auth_url.clone(), jwt);
89 let auth_client = ClientBuilder::default().connect_with(auth_transport).await?;
90 let auth_provider = RootProvider::<AnyNetwork>::new(auth_client);
91
92 let parent_block = auth_provider
94 .get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
95 .await?
96 .ok_or_else(|| eyre::eyre!("Failed to fetch latest block"))?;
97
98 let initial_parent_hash = parent_block.header.hash;
99 let initial_parent_number = parent_block.header.number;
100
101 info!(
102 parent_hash = %initial_parent_hash,
103 parent_number = initial_parent_number,
104 "Using initial parent block"
105 );
106
107 let gas_ramp_payloads = if let Some(ref gas_ramp_dir) = self.gas_ramp_dir {
109 let payloads = self.load_gas_ramp_payloads(gas_ramp_dir)?;
110 if payloads.is_empty() {
111 return Err(eyre::eyre!("No gas ramp payload files found in {:?}", gas_ramp_dir));
112 }
113 info!(count = payloads.len(), "Loaded gas ramp payloads from disk");
114 payloads
115 } else {
116 Vec::new()
117 };
118
119 let payloads = self.load_payloads()?;
120 if payloads.is_empty() {
121 return Err(eyre::eyre!("No payload files found in {:?}", self.payload_dir));
122 }
123 info!(count = payloads.len(), "Loaded main payloads from disk");
124
125 let mut parent_hash = initial_parent_hash;
126
127 for (i, payload) in gas_ramp_payloads.iter().enumerate() {
129 info!(
130 gas_ramp_payload = i + 1,
131 total = gas_ramp_payloads.len(),
132 block_number = payload.block_number,
133 block_hash = %payload.file.block_hash,
134 "Executing gas ramp payload (newPayload + FCU)"
135 );
136
137 call_new_payload(&auth_provider, payload.version, payload.file.params.clone()).await?;
138
139 let fcu_state = ForkchoiceState {
140 head_block_hash: payload.file.block_hash,
141 safe_block_hash: parent_hash,
142 finalized_block_hash: parent_hash,
143 };
144 call_forkchoice_updated(&auth_provider, payload.version, fcu_state, None).await?;
145
146 info!(gas_ramp_payload = i + 1, "Gas ramp payload executed successfully");
147 parent_hash = payload.file.block_hash;
148 }
149
150 if !gas_ramp_payloads.is_empty() {
151 info!(count = gas_ramp_payloads.len(), "All gas ramp payloads replayed");
152 }
153
154 for (i, payload) in payloads.iter().enumerate() {
155 info!(
156 payload = i + 1,
157 total = payloads.len(),
158 index = payload.index,
159 block_hash = %payload.block_hash,
160 "Executing payload (newPayload + FCU)"
161 );
162
163 self.execute_payload_v4(&auth_provider, &payload.envelope, parent_hash).await?;
164
165 info!(payload = i + 1, "Payload executed successfully");
166 parent_hash = payload.block_hash;
167 }
168
169 info!(count = payloads.len(), "All payloads replayed successfully");
170 Ok(())
171 }
172
173 fn load_payloads(&self) -> eyre::Result<Vec<LoadedPayload>> {
175 let mut payloads = Vec::new();
176
177 let entries: Vec<_> = std::fs::read_dir(&self.payload_dir)
179 .wrap_err_with(|| format!("Failed to read directory {:?}", self.payload_dir))?
180 .filter_map(|e| e.ok())
181 .filter(|e| {
182 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
183 e.file_name().to_string_lossy().starts_with("payload_block_")
184 })
185 .collect();
186
187 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
189 .into_iter()
190 .filter_map(|e| {
191 let name = e.file_name();
192 let name_str = name.to_string_lossy();
193 let index_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
195 let index: u64 = index_str.parse().ok()?;
196 Some((index, e.path()))
197 })
198 .collect();
199
200 indexed_paths.sort_by_key(|(idx, _)| *idx);
201
202 let indexed_paths: Vec<_> = indexed_paths.into_iter().skip(self.skip).collect();
204 let indexed_paths: Vec<_> = match self.count {
205 Some(count) => indexed_paths.into_iter().take(count).collect(),
206 None => indexed_paths,
207 };
208
209 for (index, path) in indexed_paths {
211 let content = std::fs::read_to_string(&path)
212 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
213 let envelope: ExecutionPayloadEnvelopeV4 = serde_json::from_str(&content)
214 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
215
216 let block_hash =
217 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
218
219 info!(
220 index = index,
221 block_hash = %block_hash,
222 path = %path.display(),
223 "Loaded payload"
224 );
225
226 payloads.push(LoadedPayload { index, envelope, block_hash });
227 }
228
229 Ok(payloads)
230 }
231
232 fn load_gas_ramp_payloads(&self, dir: &PathBuf) -> eyre::Result<Vec<GasRampPayload>> {
234 let mut payloads = Vec::new();
235
236 let entries: Vec<_> = std::fs::read_dir(dir)
237 .wrap_err_with(|| format!("Failed to read directory {:?}", dir))?
238 .filter_map(|e| e.ok())
239 .filter(|e| {
240 e.path().extension().and_then(|s| s.to_str()) == Some("json") &&
241 e.file_name().to_string_lossy().starts_with("payload_block_")
242 })
243 .collect();
244
245 let mut indexed_paths: Vec<(u64, PathBuf)> = entries
247 .into_iter()
248 .filter_map(|e| {
249 let name = e.file_name();
250 let name_str = name.to_string_lossy();
251 let block_str = name_str.strip_prefix("payload_block_")?.strip_suffix(".json")?;
253 let block_number: u64 = block_str.parse().ok()?;
254 Some((block_number, e.path()))
255 })
256 .collect();
257
258 indexed_paths.sort_by_key(|(num, _)| *num);
259
260 for (block_number, path) in indexed_paths {
261 let content = std::fs::read_to_string(&path)
262 .wrap_err_with(|| format!("Failed to read {:?}", path))?;
263 let file: GasRampPayloadFile = serde_json::from_str(&content)
264 .wrap_err_with(|| format!("Failed to parse {:?}", path))?;
265
266 let version = match file.version {
267 1 => EngineApiMessageVersion::V1,
268 2 => EngineApiMessageVersion::V2,
269 3 => EngineApiMessageVersion::V3,
270 4 => EngineApiMessageVersion::V4,
271 5 => EngineApiMessageVersion::V5,
272 v => return Err(eyre::eyre!("Invalid version {} in {:?}", v, path)),
273 };
274
275 info!(
276 block_number,
277 block_hash = %file.block_hash,
278 path = %path.display(),
279 "Loaded gas ramp payload"
280 );
281
282 payloads.push(GasRampPayload { block_number, version, file });
283 }
284
285 Ok(payloads)
286 }
287
288 async fn execute_payload_v4(
289 &self,
290 provider: &RootProvider<AnyNetwork>,
291 envelope: &ExecutionPayloadEnvelopeV4,
292 parent_hash: B256,
293 ) -> eyre::Result<()> {
294 let block_hash =
295 envelope.envelope_inner.execution_payload.payload_inner.payload_inner.block_hash;
296
297 debug!(
298 method = "engine_newPayloadV4",
299 block_hash = %block_hash,
300 "Sending newPayload"
301 );
302
303 let status = provider
304 .new_payload_v4(
305 envelope.envelope_inner.execution_payload.clone(),
306 vec![],
307 B256::ZERO,
308 envelope.execution_requests.to_vec(),
309 )
310 .await?;
311
312 info!(?status, "newPayloadV4 response");
313
314 if !status.is_valid() {
315 return Err(eyre::eyre!("Payload rejected: {:?}", status));
316 }
317
318 let fcu_state = ForkchoiceState {
319 head_block_hash: block_hash,
320 safe_block_hash: parent_hash,
321 finalized_block_hash: parent_hash,
322 };
323
324 debug!(method = "engine_forkchoiceUpdatedV3", ?fcu_state, "Sending forkchoiceUpdated");
325
326 let fcu_result = provider.fork_choice_updated_v3(fcu_state, None).await?;
327
328 info!(?fcu_result, "forkchoiceUpdatedV3 response");
329
330 Ok(())
331 }
332}