1use alloy_eips::eip7685::Requests;
6use alloy_primitives::{Bytes, B256};
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9 ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
10 ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use reth_node_core::args::WaitForPersistence;
16use reth_rpc_api::RethNewPayloadInput;
17use serde::Deserialize;
18use std::time::Duration;
19use tracing::{debug, error};
20
21#[async_trait::async_trait]
23pub trait EngineApiValidWaitExt<N>: Send + Sync {
24 async fn fork_choice_updated_v1_wait(
27 &self,
28 fork_choice_state: ForkchoiceState,
29 payload_attributes: Option<PayloadAttributes>,
30 ) -> TransportResult<ForkchoiceUpdated>;
31
32 async fn fork_choice_updated_v2_wait(
35 &self,
36 fork_choice_state: ForkchoiceState,
37 payload_attributes: Option<PayloadAttributes>,
38 ) -> TransportResult<ForkchoiceUpdated>;
39
40 async fn fork_choice_updated_v3_wait(
43 &self,
44 fork_choice_state: ForkchoiceState,
45 payload_attributes: Option<PayloadAttributes>,
46 ) -> TransportResult<ForkchoiceUpdated>;
47}
48
49#[async_trait::async_trait]
50impl<N, P> EngineApiValidWaitExt<N> for P
51where
52 N: Network,
53 P: Provider<N> + EngineApi<N>,
54{
55 async fn fork_choice_updated_v1_wait(
56 &self,
57 fork_choice_state: ForkchoiceState,
58 payload_attributes: Option<PayloadAttributes>,
59 ) -> TransportResult<ForkchoiceUpdated> {
60 debug!(
61 target: "reth-bench",
62 method = "engine_forkchoiceUpdatedV1",
63 ?fork_choice_state,
64 ?payload_attributes,
65 "Sending forkchoiceUpdated"
66 );
67
68 let mut status =
69 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
70
71 while !status.is_valid() {
72 if status.is_invalid() {
73 error!(
74 target: "reth-bench",
75 ?status,
76 ?fork_choice_state,
77 ?payload_attributes,
78 "Invalid forkchoiceUpdatedV1 message",
79 );
80 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
81 }
82 if status.is_syncing() {
83 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
84 "invalid range: no canonical state found for parent of requested block",
85 ))
86 }
87 status =
88 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
89 }
90
91 Ok(status)
92 }
93
94 async fn fork_choice_updated_v2_wait(
95 &self,
96 fork_choice_state: ForkchoiceState,
97 payload_attributes: Option<PayloadAttributes>,
98 ) -> TransportResult<ForkchoiceUpdated> {
99 debug!(
100 target: "reth-bench",
101 method = "engine_forkchoiceUpdatedV2",
102 ?fork_choice_state,
103 ?payload_attributes,
104 "Sending forkchoiceUpdated"
105 );
106
107 let mut status =
108 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
109
110 while !status.is_valid() {
111 if status.is_invalid() {
112 error!(
113 target: "reth-bench",
114 ?status,
115 ?fork_choice_state,
116 ?payload_attributes,
117 "Invalid forkchoiceUpdatedV2 message",
118 );
119 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
120 }
121 if status.is_syncing() {
122 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
123 "invalid range: no canonical state found for parent of requested block",
124 ))
125 }
126 status =
127 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
128 }
129
130 Ok(status)
131 }
132
133 async fn fork_choice_updated_v3_wait(
134 &self,
135 fork_choice_state: ForkchoiceState,
136 payload_attributes: Option<PayloadAttributes>,
137 ) -> TransportResult<ForkchoiceUpdated> {
138 debug!(
139 target: "reth-bench",
140 method = "engine_forkchoiceUpdatedV3",
141 ?fork_choice_state,
142 ?payload_attributes,
143 "Sending forkchoiceUpdated"
144 );
145
146 let mut status =
147 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
148
149 while !status.is_valid() {
150 if status.is_invalid() {
151 error!(
152 target: "reth-bench",
153 ?status,
154 ?fork_choice_state,
155 ?payload_attributes,
156 "Invalid forkchoiceUpdatedV3 message",
157 );
158 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
159 }
160 status =
161 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
162 }
163
164 Ok(status)
165 }
166}
167
168pub(crate) fn block_to_new_payload(
175 block: AnyRpcBlock,
176 is_optimism: bool,
177 rlp: Option<Bytes>,
178 reth_new_payload: bool,
179 wait_for_persistence: WaitForPersistence,
180 no_wait_for_caches: bool,
181) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
182 let block_number = block.header.number;
183 let wait_for_persistence = wait_for_persistence.rpc_value(block_number);
184
185 if let Some(rlp) = rlp {
186 return Ok((
187 None,
188 serde_json::to_value((
189 RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),
190 wait_for_persistence,
191 no_wait_for_caches.then_some(false),
192 ))?,
193 ));
194 }
195 let block = block
196 .into_inner()
197 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
198 .try_map_transactions(|tx| {
199 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
201 })?
202 .into_consensus();
203
204 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
206 let (version, params, execution_data) =
207 payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
208
209 if reth_new_payload {
210 Ok((
211 None,
212 serde_json::to_value((
213 RethNewPayloadInput::ExecutionData(execution_data),
214 wait_for_persistence,
215 no_wait_for_caches.then_some(false),
216 ))?,
217 ))
218 } else {
219 Ok((Some(version), params))
220 }
221}
222
223pub(crate) fn payload_to_new_payload(
228 payload: ExecutionPayload,
229 sidecar: ExecutionPayloadSidecar,
230 is_optimism: bool,
231 withdrawals_root: Option<B256>,
232 target_version: Option<EngineApiMessageVersion>,
233) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
234 let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
235
236 let (version, params) = match payload {
237 ExecutionPayload::V3(payload) => {
238 let cancun = sidecar.cancun().unwrap();
239
240 if let Some(prague) = sidecar.prague() {
241 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
243
244 if is_optimism {
245 let withdrawals_root = withdrawals_root.ok_or_else(|| {
246 eyre::eyre!("Missing withdrawals root for Optimism payload")
247 })?;
248 (
249 version,
250 serde_json::to_value((
251 OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
252 cancun.versioned_hashes.clone(),
253 cancun.parent_beacon_block_root,
254 Requests::default(),
255 ))?,
256 )
257 } else {
258 let requests = prague.requests.clone();
260 (
261 version,
262 serde_json::to_value((
263 payload,
264 cancun.versioned_hashes.clone(),
265 cancun.parent_beacon_block_root,
266 requests,
267 ))?,
268 )
269 }
270 } else {
271 (
272 EngineApiMessageVersion::V3,
273 serde_json::to_value((
274 payload,
275 cancun.versioned_hashes.clone(),
276 cancun.parent_beacon_block_root,
277 ))?,
278 )
279 }
280 }
281 ExecutionPayload::V2(payload) => {
282 let input = ExecutionPayloadInputV2 {
283 execution_payload: payload.payload_inner,
284 withdrawals: Some(payload.withdrawals),
285 };
286
287 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
288 }
289 ExecutionPayload::V1(payload) => {
290 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
291 }
292 };
293
294 Ok((version, params, execution_data))
295}
296
297#[allow(dead_code)]
303pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
304 provider: P,
305 version: Option<EngineApiMessageVersion>,
306 params: serde_json::Value,
307) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
308 call_new_payload_with_reth(provider, version, params).await
309}
310
311#[derive(Debug, Deserialize)]
313struct RethPayloadStatus {
314 latency_us: u64,
315 #[serde(default)]
316 persistence_wait_us: Option<u64>,
317 #[serde(default)]
318 execution_cache_wait_us: u64,
319 #[serde(default)]
320 sparse_trie_wait_us: u64,
321}
322
323#[derive(Debug, Clone, Copy, Default)]
325pub(crate) struct NewPayloadTimingBreakdown {
326 pub(crate) latency: Duration,
328 pub(crate) persistence_wait: Option<Duration>,
330 pub(crate) execution_cache_wait: Duration,
332 pub(crate) sparse_trie_wait: Duration,
334}
335
336pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
344 provider: P,
345 version: Option<EngineApiMessageVersion>,
346 params: serde_json::Value,
347) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
348 let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
349
350 debug!(target: "reth-bench", method, "Sending newPayload");
351
352 let resp = loop {
353 let resp: serde_json::Value = provider.client().request(method, ¶ms).await?;
354 let status = PayloadStatus::deserialize(&resp)?;
355
356 if status.is_valid() {
357 break resp;
358 }
359 if status.is_invalid() {
360 return Err(eyre::eyre!("Invalid {method}: {status:?}"));
361 }
362 if status.is_syncing() {
363 return Err(eyre::eyre!(
364 "invalid range: no canonical state found for parent of requested block"
365 ));
366 }
367 };
368
369 if version.is_some() {
370 return Ok(None);
371 }
372
373 let resp: RethPayloadStatus = serde_json::from_value(resp)?;
374
375 Ok(Some(NewPayloadTimingBreakdown {
376 latency: Duration::from_micros(resp.latency_us),
377 persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
378 execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
379 sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
380 }))
381}
382
383pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
389 provider: P,
390 message_version: EngineApiMessageVersion,
391 forkchoice_state: ForkchoiceState,
392 payload_attributes: Option<PayloadAttributes>,
393) -> TransportResult<ForkchoiceUpdated> {
394 match message_version {
396 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
397 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
398 }
399 EngineApiMessageVersion::V2 => {
400 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
401 }
402 EngineApiMessageVersion::V1 => {
403 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
404 }
405 }
406}
407
408pub(crate) async fn call_forkchoice_updated_with_reth<
414 N: Network,
415 P: Provider<N> + EngineApiValidWaitExt<N>,
416>(
417 provider: P,
418 message_version: Option<EngineApiMessageVersion>,
419 forkchoice_state: ForkchoiceState,
420) -> TransportResult<ForkchoiceUpdated> {
421 if let Some(message_version) = message_version {
422 call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
423 } else {
424 let method = "reth_forkchoiceUpdated";
425 let reth_params = serde_json::to_value((forkchoice_state,))
426 .expect("ForkchoiceState serialization cannot fail");
427
428 debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
429
430 loop {
431 let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
432
433 if resp.is_valid() {
434 break Ok(resp)
435 }
436
437 if resp.is_invalid() {
438 error!(target: "reth-bench", ?resp, "Invalid {method}");
439 return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
440 std::io::Error::other(format!("Invalid {method}: {resp:?}")),
441 )))
442 }
443 if resp.is_syncing() {
444 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
445 "invalid range: no canonical state found for parent of requested block",
446 ))
447 }
448 }
449 }
450}