1use alloy_eips::eip7685::Requests;
6use alloy_primitives::{Bytes, B256};
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9 ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
10 ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use reth_rpc_api::RethNewPayloadInput;
16use serde::Deserialize;
17use std::time::Duration;
18use tracing::{debug, error};
19
20#[async_trait::async_trait]
22pub trait EngineApiValidWaitExt<N>: Send + Sync {
23 async fn fork_choice_updated_v1_wait(
26 &self,
27 fork_choice_state: ForkchoiceState,
28 payload_attributes: Option<PayloadAttributes>,
29 ) -> TransportResult<ForkchoiceUpdated>;
30
31 async fn fork_choice_updated_v2_wait(
34 &self,
35 fork_choice_state: ForkchoiceState,
36 payload_attributes: Option<PayloadAttributes>,
37 ) -> TransportResult<ForkchoiceUpdated>;
38
39 async fn fork_choice_updated_v3_wait(
42 &self,
43 fork_choice_state: ForkchoiceState,
44 payload_attributes: Option<PayloadAttributes>,
45 ) -> TransportResult<ForkchoiceUpdated>;
46}
47
48#[async_trait::async_trait]
49impl<N, P> EngineApiValidWaitExt<N> for P
50where
51 N: Network,
52 P: Provider<N> + EngineApi<N>,
53{
54 async fn fork_choice_updated_v1_wait(
55 &self,
56 fork_choice_state: ForkchoiceState,
57 payload_attributes: Option<PayloadAttributes>,
58 ) -> TransportResult<ForkchoiceUpdated> {
59 debug!(
60 target: "reth-bench",
61 method = "engine_forkchoiceUpdatedV1",
62 ?fork_choice_state,
63 ?payload_attributes,
64 "Sending forkchoiceUpdated"
65 );
66
67 let mut status =
68 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
69
70 while !status.is_valid() {
71 if status.is_invalid() {
72 error!(
73 target: "reth-bench",
74 ?status,
75 ?fork_choice_state,
76 ?payload_attributes,
77 "Invalid forkchoiceUpdatedV1 message",
78 );
79 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
80 }
81 if status.is_syncing() {
82 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
83 "invalid range: no canonical state found for parent of requested block",
84 ))
85 }
86 status =
87 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
88 }
89
90 Ok(status)
91 }
92
93 async fn fork_choice_updated_v2_wait(
94 &self,
95 fork_choice_state: ForkchoiceState,
96 payload_attributes: Option<PayloadAttributes>,
97 ) -> TransportResult<ForkchoiceUpdated> {
98 debug!(
99 target: "reth-bench",
100 method = "engine_forkchoiceUpdatedV2",
101 ?fork_choice_state,
102 ?payload_attributes,
103 "Sending forkchoiceUpdated"
104 );
105
106 let mut status =
107 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
108
109 while !status.is_valid() {
110 if status.is_invalid() {
111 error!(
112 target: "reth-bench",
113 ?status,
114 ?fork_choice_state,
115 ?payload_attributes,
116 "Invalid forkchoiceUpdatedV2 message",
117 );
118 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
119 }
120 if status.is_syncing() {
121 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
122 "invalid range: no canonical state found for parent of requested block",
123 ))
124 }
125 status =
126 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
127 }
128
129 Ok(status)
130 }
131
132 async fn fork_choice_updated_v3_wait(
133 &self,
134 fork_choice_state: ForkchoiceState,
135 payload_attributes: Option<PayloadAttributes>,
136 ) -> TransportResult<ForkchoiceUpdated> {
137 debug!(
138 target: "reth-bench",
139 method = "engine_forkchoiceUpdatedV3",
140 ?fork_choice_state,
141 ?payload_attributes,
142 "Sending forkchoiceUpdated"
143 );
144
145 let mut status =
146 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
147
148 while !status.is_valid() {
149 if status.is_invalid() {
150 error!(
151 target: "reth-bench",
152 ?status,
153 ?fork_choice_state,
154 ?payload_attributes,
155 "Invalid forkchoiceUpdatedV3 message",
156 );
157 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
158 }
159 status =
160 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
161 }
162
163 Ok(status)
164 }
165}
166
167pub(crate) fn block_to_new_payload(
171 block: AnyRpcBlock,
172 is_optimism: bool,
173 rlp: Option<Bytes>,
174 reth_new_payload: bool,
175) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
176 if let Some(rlp) = rlp {
177 return Ok((
178 None,
179 serde_json::to_value((RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),))?,
180 ));
181 }
182 let block = block
183 .into_inner()
184 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
185 .try_map_transactions(|tx| {
186 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
188 })?
189 .into_consensus();
190
191 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
193 let (version, params, execution_data) =
194 payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
195
196 if reth_new_payload {
197 Ok((None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?))
198 } else {
199 Ok((Some(version), params))
200 }
201}
202
203pub(crate) fn payload_to_new_payload(
208 payload: ExecutionPayload,
209 sidecar: ExecutionPayloadSidecar,
210 is_optimism: bool,
211 withdrawals_root: Option<B256>,
212 target_version: Option<EngineApiMessageVersion>,
213) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
214 let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
215
216 let (version, params) = match payload {
217 ExecutionPayload::V3(payload) => {
218 let cancun = sidecar.cancun().unwrap();
219
220 if let Some(prague) = sidecar.prague() {
221 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
223
224 if is_optimism {
225 let withdrawals_root = withdrawals_root.ok_or_else(|| {
226 eyre::eyre!("Missing withdrawals root for Optimism payload")
227 })?;
228 (
229 version,
230 serde_json::to_value((
231 OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
232 cancun.versioned_hashes.clone(),
233 cancun.parent_beacon_block_root,
234 Requests::default(),
235 ))?,
236 )
237 } else {
238 let requests = prague.requests.requests_hash();
240 (
241 version,
242 serde_json::to_value((
243 payload,
244 cancun.versioned_hashes.clone(),
245 cancun.parent_beacon_block_root,
246 requests,
247 ))?,
248 )
249 }
250 } else {
251 (
252 EngineApiMessageVersion::V3,
253 serde_json::to_value((
254 payload,
255 cancun.versioned_hashes.clone(),
256 cancun.parent_beacon_block_root,
257 ))?,
258 )
259 }
260 }
261 ExecutionPayload::V2(payload) => {
262 let input = ExecutionPayloadInputV2 {
263 execution_payload: payload.payload_inner,
264 withdrawals: Some(payload.withdrawals),
265 };
266
267 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
268 }
269 ExecutionPayload::V1(payload) => {
270 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
271 }
272 };
273
274 Ok((version, params, execution_data))
275}
276
277#[allow(dead_code)]
283pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
284 provider: P,
285 version: Option<EngineApiMessageVersion>,
286 params: serde_json::Value,
287) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
288 call_new_payload_with_reth(provider, version, params).await
289}
290
291#[derive(Debug, Deserialize)]
293struct RethPayloadStatus {
294 latency_us: u64,
295 #[serde(default)]
296 persistence_wait_us: Option<u64>,
297 #[serde(default)]
298 execution_cache_wait_us: u64,
299 #[serde(default)]
300 sparse_trie_wait_us: u64,
301}
302
303#[derive(Debug, Clone, Copy, Default)]
305pub(crate) struct NewPayloadTimingBreakdown {
306 pub(crate) latency: Duration,
308 pub(crate) persistence_wait: Option<Duration>,
310 pub(crate) execution_cache_wait: Duration,
312 pub(crate) sparse_trie_wait: Duration,
314}
315
316pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
324 provider: P,
325 version: Option<EngineApiMessageVersion>,
326 params: serde_json::Value,
327) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
328 let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
329
330 debug!(target: "reth-bench", method, "Sending newPayload");
331
332 let resp = loop {
333 let resp: serde_json::Value = provider.client().request(method, ¶ms).await?;
334 let status = PayloadStatus::deserialize(&resp)?;
335
336 if status.is_valid() {
337 break resp;
338 }
339 if status.is_invalid() {
340 return Err(eyre::eyre!("Invalid {method}: {status:?}"));
341 }
342 if status.is_syncing() {
343 return Err(eyre::eyre!(
344 "invalid range: no canonical state found for parent of requested block"
345 ));
346 }
347 };
348
349 if version.is_some() {
350 return Ok(None);
351 }
352
353 let resp: RethPayloadStatus = serde_json::from_value(resp)?;
354
355 Ok(Some(NewPayloadTimingBreakdown {
356 latency: Duration::from_micros(resp.latency_us),
357 persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
358 execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
359 sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
360 }))
361}
362
363pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
369 provider: P,
370 message_version: EngineApiMessageVersion,
371 forkchoice_state: ForkchoiceState,
372 payload_attributes: Option<PayloadAttributes>,
373) -> TransportResult<ForkchoiceUpdated> {
374 match message_version {
376 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
377 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
378 }
379 EngineApiMessageVersion::V2 => {
380 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
381 }
382 EngineApiMessageVersion::V1 => {
383 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
384 }
385 }
386}
387
388pub(crate) async fn call_forkchoice_updated_with_reth<
394 N: Network,
395 P: Provider<N> + EngineApiValidWaitExt<N>,
396>(
397 provider: P,
398 message_version: Option<EngineApiMessageVersion>,
399 forkchoice_state: ForkchoiceState,
400) -> TransportResult<ForkchoiceUpdated> {
401 if let Some(message_version) = message_version {
402 call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
403 } else {
404 let method = "reth_forkchoiceUpdated";
405 let reth_params = serde_json::to_value((forkchoice_state,))
406 .expect("ForkchoiceState serialization cannot fail");
407
408 debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
409
410 loop {
411 let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
412
413 if resp.is_valid() {
414 break Ok(resp)
415 }
416
417 if resp.is_invalid() {
418 error!(target: "reth-bench", ?resp, "Invalid {method}");
419 return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
420 std::io::Error::other(format!("Invalid {method}: {resp:?}")),
421 )))
422 }
423 if resp.is_syncing() {
424 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
425 "invalid range: no canonical state found for parent of requested block",
426 ))
427 }
428 }
429 }
430}