1use alloy_consensus::TxEnvelope;
6use alloy_eips::eip7928::BlockAccessList;
7use alloy_primitives::Bytes;
8use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
9use alloy_rpc_types_engine::{
10 ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
11 ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
12};
13use alloy_transport::TransportResult;
14use reth_node_api::EngineApiMessageVersion;
15use reth_node_core::args::WaitForPersistence;
16use reth_rpc_api::RethNewPayloadInput;
17use serde::Deserialize;
18use std::time::Duration;
19use tracing::{debug, error};
20
21#[async_trait::async_trait]
23pub trait EngineApiValidWaitExt<N>: Send + Sync {
24 async fn fork_choice_updated_v1_wait(
27 &self,
28 fork_choice_state: ForkchoiceState,
29 payload_attributes: Option<PayloadAttributes>,
30 ) -> TransportResult<ForkchoiceUpdated>;
31
32 async fn fork_choice_updated_v2_wait(
35 &self,
36 fork_choice_state: ForkchoiceState,
37 payload_attributes: Option<PayloadAttributes>,
38 ) -> TransportResult<ForkchoiceUpdated>;
39
40 async fn fork_choice_updated_v3_wait(
43 &self,
44 fork_choice_state: ForkchoiceState,
45 payload_attributes: Option<PayloadAttributes>,
46 ) -> TransportResult<ForkchoiceUpdated>;
47
48 async fn fork_choice_updated_v4_wait(
51 &self,
52 fork_choice_state: ForkchoiceState,
53 payload_attributes: Option<PayloadAttributes>,
54 ) -> TransportResult<ForkchoiceUpdated>;
55}
56
57#[async_trait::async_trait]
58impl<N, P> EngineApiValidWaitExt<N> for P
59where
60 N: Network,
61 P: Provider<N> + EngineApi<N>,
62{
63 async fn fork_choice_updated_v1_wait(
64 &self,
65 fork_choice_state: ForkchoiceState,
66 payload_attributes: Option<PayloadAttributes>,
67 ) -> TransportResult<ForkchoiceUpdated> {
68 debug!(
69 target: "reth-bench",
70 method = "engine_forkchoiceUpdatedV1",
71 ?fork_choice_state,
72 ?payload_attributes,
73 "Sending forkchoiceUpdated"
74 );
75
76 let mut status =
77 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
78
79 while !status.is_valid() {
80 if status.is_invalid() {
81 error!(
82 target: "reth-bench",
83 ?status,
84 ?fork_choice_state,
85 ?payload_attributes,
86 "Invalid forkchoiceUpdatedV1 message",
87 );
88 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
89 }
90 if status.is_syncing() {
91 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
92 "invalid range: no canonical state found for parent of requested block",
93 ))
94 }
95 status =
96 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
97 }
98
99 Ok(status)
100 }
101
102 async fn fork_choice_updated_v2_wait(
103 &self,
104 fork_choice_state: ForkchoiceState,
105 payload_attributes: Option<PayloadAttributes>,
106 ) -> TransportResult<ForkchoiceUpdated> {
107 debug!(
108 target: "reth-bench",
109 method = "engine_forkchoiceUpdatedV2",
110 ?fork_choice_state,
111 ?payload_attributes,
112 "Sending forkchoiceUpdated"
113 );
114
115 let mut status =
116 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
117
118 while !status.is_valid() {
119 if status.is_invalid() {
120 error!(
121 target: "reth-bench",
122 ?status,
123 ?fork_choice_state,
124 ?payload_attributes,
125 "Invalid forkchoiceUpdatedV2 message",
126 );
127 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
128 }
129 if status.is_syncing() {
130 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
131 "invalid range: no canonical state found for parent of requested block",
132 ))
133 }
134 status =
135 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
136 }
137
138 Ok(status)
139 }
140
141 async fn fork_choice_updated_v3_wait(
142 &self,
143 fork_choice_state: ForkchoiceState,
144 payload_attributes: Option<PayloadAttributes>,
145 ) -> TransportResult<ForkchoiceUpdated> {
146 debug!(
147 target: "reth-bench",
148 method = "engine_forkchoiceUpdatedV3",
149 ?fork_choice_state,
150 ?payload_attributes,
151 "Sending forkchoiceUpdated"
152 );
153
154 let mut status =
155 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
156
157 while !status.is_valid() {
158 if status.is_invalid() {
159 error!(
160 target: "reth-bench",
161 ?status,
162 ?fork_choice_state,
163 ?payload_attributes,
164 "Invalid forkchoiceUpdatedV3 message",
165 );
166 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
167 }
168 status =
169 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
170 }
171
172 Ok(status)
173 }
174
175 async fn fork_choice_updated_v4_wait(
176 &self,
177 fork_choice_state: ForkchoiceState,
178 payload_attributes: Option<PayloadAttributes>,
179 ) -> TransportResult<ForkchoiceUpdated> {
180 debug!(
181 target: "reth-bench",
182 method = "engine_forkchoiceUpdatedV3",
183 ?fork_choice_state,
184 ?payload_attributes,
185 "Sending forkchoiceUpdated"
186 );
187
188 let mut status =
189 self.fork_choice_updated_v4(fork_choice_state, payload_attributes.clone()).await?;
190
191 while !status.is_valid() {
192 if status.is_invalid() {
193 error!(
194 target: "reth-bench",
195 ?status,
196 ?fork_choice_state,
197 ?payload_attributes,
198 "Invalid forkchoiceUpdatedV4 message",
199 );
200 panic!("Invalid forkchoiceUpdatedV4: {status:?}");
201 }
202 status =
203 self.fork_choice_updated_v4(fork_choice_state, payload_attributes.clone()).await?;
204 }
205
206 Ok(status)
207 }
208}
209
210pub(crate) fn block_to_new_payload(
217 block: AnyRpcBlock,
218 rlp: Option<Bytes>,
219 reth_new_payload: bool,
220 wait_for_persistence: WaitForPersistence,
221 no_wait_for_caches: bool,
222 bal: Option<BlockAccessList>,
223) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
224 let block_number = block.header.number;
225 let wait_for_persistence = wait_for_persistence.rpc_value(block_number);
226
227 if let Some(rlp) = rlp {
228 let bal = bal.map(|bal| alloy_rlp::encode(bal).into());
229 return Ok((
230 None,
231 serde_json::to_value((
232 RethNewPayloadInput::<ExecutionData>::BlockRlp { block: rlp, bal },
233 wait_for_persistence,
234 no_wait_for_caches.then_some(false),
235 ))?,
236 ));
237 }
238
239 let block = block
240 .into_inner()
241 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
242 .try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
243 tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
244 })?
245 .into_consensus();
246
247 let block_access_list = alloy_rlp::encode(bal.unwrap_or_default());
248
249 let (payload, sidecar) =
250 ExecutionPayload::from_block_slow_with_bal(&block, block_access_list.into());
251 let (version, params, execution_data) = payload_to_new_payload(payload, sidecar, None)?;
252
253 if reth_new_payload {
254 Ok((
255 None,
256 serde_json::to_value((
257 RethNewPayloadInput::ExecutionData(execution_data),
258 wait_for_persistence,
259 no_wait_for_caches.then_some(false),
260 ))?,
261 ))
262 } else {
263 Ok((Some(version), params))
264 }
265}
266
267pub(crate) fn payload_to_new_payload(
272 payload: ExecutionPayload,
273 sidecar: ExecutionPayloadSidecar,
274 target_version: Option<EngineApiMessageVersion>,
275) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
276 let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
277
278 let (version, params) = match payload {
279 ExecutionPayload::V4(payload) => {
280 let cancun = sidecar
281 .cancun()
282 .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V4 payload"))?;
283 let version = target_version.unwrap_or(EngineApiMessageVersion::V6);
284 let requests = sidecar.prague().map(|p| p.requests.clone()).unwrap_or_default();
285 (
286 version,
287 serde_json::to_value((
288 payload,
289 cancun.versioned_hashes.clone(),
290 cancun.parent_beacon_block_root,
291 requests,
292 ))?,
293 )
294 }
295 ExecutionPayload::V3(payload) => {
296 let cancun = sidecar
297 .cancun()
298 .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V3 payload"))?;
299
300 if let Some(prague) = sidecar.prague() {
301 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
302 let requests = prague.requests.clone();
303 (
304 version,
305 serde_json::to_value((
306 payload,
307 cancun.versioned_hashes.clone(),
308 cancun.parent_beacon_block_root,
309 requests,
310 ))?,
311 )
312 } else {
313 (
314 EngineApiMessageVersion::V3,
315 serde_json::to_value((
316 payload,
317 cancun.versioned_hashes.clone(),
318 cancun.parent_beacon_block_root,
319 ))?,
320 )
321 }
322 }
323 ExecutionPayload::V2(payload) => {
324 let input = ExecutionPayloadInputV2 {
325 execution_payload: payload.payload_inner,
326 withdrawals: Some(payload.withdrawals),
327 };
328
329 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
330 }
331 ExecutionPayload::V1(payload) => {
332 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
333 }
334 };
335
336 Ok((version, params, execution_data))
337}
338
339#[allow(dead_code)]
345pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
346 provider: P,
347 version: Option<EngineApiMessageVersion>,
348 params: serde_json::Value,
349) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
350 call_new_payload_with_reth(provider, version, params).await
351}
352
353#[derive(Debug, Deserialize)]
355struct RethPayloadStatus {
356 latency_us: u64,
357 #[serde(default)]
358 persistence_wait_us: u64,
359 #[serde(default)]
360 execution_cache_wait_us: u64,
361 #[serde(default)]
362 sparse_trie_wait_us: u64,
363}
364
365#[derive(Debug, Clone, Copy, Default)]
367pub(crate) struct NewPayloadTimingBreakdown {
368 pub(crate) latency: Duration,
370 pub(crate) persistence_wait: Duration,
372 pub(crate) execution_cache_wait: Duration,
374 pub(crate) sparse_trie_wait: Duration,
376}
377
378pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
386 provider: P,
387 version: Option<EngineApiMessageVersion>,
388 params: serde_json::Value,
389) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
390 let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
391
392 debug!(target: "reth-bench", method, "Sending newPayload");
393
394 let resp = loop {
395 let resp: serde_json::Value = provider.client().request(method, ¶ms).await?;
396 let status = PayloadStatus::deserialize(&resp)?;
397
398 if status.is_valid() {
399 break resp;
400 }
401 if status.is_invalid() {
402 return Err(eyre::eyre!("Invalid {method}: {status:?}"));
403 }
404 if status.is_syncing() {
405 return Err(eyre::eyre!(
406 "invalid range: no canonical state found for parent of requested block"
407 ));
408 }
409 };
410
411 if version.is_some() {
412 return Ok(None);
413 }
414
415 let resp: RethPayloadStatus = serde_json::from_value(resp)?;
416
417 Ok(Some(NewPayloadTimingBreakdown {
418 latency: Duration::from_micros(resp.latency_us),
419 persistence_wait: Duration::from_micros(resp.persistence_wait_us),
420 execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
421 sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
422 }))
423}
424
425pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
431 provider: P,
432 message_version: EngineApiMessageVersion,
433 forkchoice_state: ForkchoiceState,
434 payload_attributes: Option<PayloadAttributes>,
435) -> TransportResult<ForkchoiceUpdated> {
436 match message_version {
438 EngineApiMessageVersion::V6 => {
439 provider.fork_choice_updated_v4_wait(forkchoice_state, payload_attributes).await
440 }
441 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
442 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
443 }
444 EngineApiMessageVersion::V2 => {
445 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
446 }
447 EngineApiMessageVersion::V1 => {
448 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
449 }
450 }
451}
452
453pub(crate) async fn call_forkchoice_updated_with_reth<
459 N: Network,
460 P: Provider<N> + EngineApiValidWaitExt<N>,
461>(
462 provider: P,
463 message_version: Option<EngineApiMessageVersion>,
464 forkchoice_state: ForkchoiceState,
465) -> TransportResult<ForkchoiceUpdated> {
466 if let Some(message_version) = message_version {
467 call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
468 } else {
469 let method = "reth_forkchoiceUpdated";
470 let reth_params = serde_json::to_value((forkchoice_state,))
471 .expect("ForkchoiceState serialization cannot fail");
472
473 debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
474
475 loop {
476 let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
477
478 if resp.is_valid() {
479 break Ok(resp)
480 }
481
482 if resp.is_invalid() {
483 error!(target: "reth-bench", ?resp, "Invalid {method}");
484 return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
485 std::io::Error::other(format!("Invalid {method}: {resp:?}")),
486 )))
487 }
488 if resp.is_syncing() {
489 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
490 "invalid range: no canonical state found for parent of requested block",
491 ))
492 }
493 }
494 }
495}