1use alloy_consensus::TxEnvelope;
6use alloy_primitives::Bytes;
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9 ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
10 ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use reth_node_api::EngineApiMessageVersion;
14use reth_node_core::args::WaitForPersistence;
15use reth_rpc_api::RethNewPayloadInput;
16use serde::Deserialize;
17use std::time::Duration;
18use tracing::{debug, error};
19
20#[async_trait::async_trait]
22pub trait EngineApiValidWaitExt<N>: Send + Sync {
23 async fn fork_choice_updated_v1_wait(
26 &self,
27 fork_choice_state: ForkchoiceState,
28 payload_attributes: Option<PayloadAttributes>,
29 ) -> TransportResult<ForkchoiceUpdated>;
30
31 async fn fork_choice_updated_v2_wait(
34 &self,
35 fork_choice_state: ForkchoiceState,
36 payload_attributes: Option<PayloadAttributes>,
37 ) -> TransportResult<ForkchoiceUpdated>;
38
39 async fn fork_choice_updated_v3_wait(
42 &self,
43 fork_choice_state: ForkchoiceState,
44 payload_attributes: Option<PayloadAttributes>,
45 ) -> TransportResult<ForkchoiceUpdated>;
46}
47
48#[async_trait::async_trait]
49impl<N, P> EngineApiValidWaitExt<N> for P
50where
51 N: Network,
52 P: Provider<N> + EngineApi<N>,
53{
54 async fn fork_choice_updated_v1_wait(
55 &self,
56 fork_choice_state: ForkchoiceState,
57 payload_attributes: Option<PayloadAttributes>,
58 ) -> TransportResult<ForkchoiceUpdated> {
59 debug!(
60 target: "reth-bench",
61 method = "engine_forkchoiceUpdatedV1",
62 ?fork_choice_state,
63 ?payload_attributes,
64 "Sending forkchoiceUpdated"
65 );
66
67 let mut status =
68 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
69
70 while !status.is_valid() {
71 if status.is_invalid() {
72 error!(
73 target: "reth-bench",
74 ?status,
75 ?fork_choice_state,
76 ?payload_attributes,
77 "Invalid forkchoiceUpdatedV1 message",
78 );
79 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
80 }
81 if status.is_syncing() {
82 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
83 "invalid range: no canonical state found for parent of requested block",
84 ))
85 }
86 status =
87 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
88 }
89
90 Ok(status)
91 }
92
93 async fn fork_choice_updated_v2_wait(
94 &self,
95 fork_choice_state: ForkchoiceState,
96 payload_attributes: Option<PayloadAttributes>,
97 ) -> TransportResult<ForkchoiceUpdated> {
98 debug!(
99 target: "reth-bench",
100 method = "engine_forkchoiceUpdatedV2",
101 ?fork_choice_state,
102 ?payload_attributes,
103 "Sending forkchoiceUpdated"
104 );
105
106 let mut status =
107 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
108
109 while !status.is_valid() {
110 if status.is_invalid() {
111 error!(
112 target: "reth-bench",
113 ?status,
114 ?fork_choice_state,
115 ?payload_attributes,
116 "Invalid forkchoiceUpdatedV2 message",
117 );
118 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
119 }
120 if status.is_syncing() {
121 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
122 "invalid range: no canonical state found for parent of requested block",
123 ))
124 }
125 status =
126 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
127 }
128
129 Ok(status)
130 }
131
132 async fn fork_choice_updated_v3_wait(
133 &self,
134 fork_choice_state: ForkchoiceState,
135 payload_attributes: Option<PayloadAttributes>,
136 ) -> TransportResult<ForkchoiceUpdated> {
137 debug!(
138 target: "reth-bench",
139 method = "engine_forkchoiceUpdatedV3",
140 ?fork_choice_state,
141 ?payload_attributes,
142 "Sending forkchoiceUpdated"
143 );
144
145 let mut status =
146 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
147
148 while !status.is_valid() {
149 if status.is_invalid() {
150 error!(
151 target: "reth-bench",
152 ?status,
153 ?fork_choice_state,
154 ?payload_attributes,
155 "Invalid forkchoiceUpdatedV3 message",
156 );
157 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
158 }
159 status =
160 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
161 }
162
163 Ok(status)
164 }
165}
166
167pub(crate) fn block_to_new_payload(
174 block: AnyRpcBlock,
175 rlp: Option<Bytes>,
176 reth_new_payload: bool,
177 wait_for_persistence: WaitForPersistence,
178 no_wait_for_caches: bool,
179) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
180 let block_number = block.header.number;
181 let wait_for_persistence = wait_for_persistence.rpc_value(block_number);
182
183 if let Some(rlp) = rlp {
184 return Ok((
185 None,
186 serde_json::to_value((
187 RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),
188 wait_for_persistence,
189 no_wait_for_caches.then_some(false),
190 ))?,
191 ));
192 }
193
194 let block = block
195 .into_inner()
196 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
197 .try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
198 tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
199 })?
200 .into_consensus();
201 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
202 let (version, params, execution_data) = payload_to_new_payload(payload, sidecar, None)?;
203
204 if reth_new_payload {
205 Ok((
206 None,
207 serde_json::to_value((
208 RethNewPayloadInput::ExecutionData(execution_data),
209 wait_for_persistence,
210 no_wait_for_caches.then_some(false),
211 ))?,
212 ))
213 } else {
214 Ok((Some(version), params))
215 }
216}
217
218pub(crate) fn payload_to_new_payload(
223 payload: ExecutionPayload,
224 sidecar: ExecutionPayloadSidecar,
225 target_version: Option<EngineApiMessageVersion>,
226) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
227 let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
228
229 let (version, params) = match payload {
230 ExecutionPayload::V3(payload) => {
231 let cancun = sidecar
232 .cancun()
233 .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V3 payload"))?;
234
235 if let Some(prague) = sidecar.prague() {
236 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
237 let requests = prague.requests.clone();
238 (
239 version,
240 serde_json::to_value((
241 payload,
242 cancun.versioned_hashes.clone(),
243 cancun.parent_beacon_block_root,
244 requests,
245 ))?,
246 )
247 } else {
248 (
249 EngineApiMessageVersion::V3,
250 serde_json::to_value((
251 payload,
252 cancun.versioned_hashes.clone(),
253 cancun.parent_beacon_block_root,
254 ))?,
255 )
256 }
257 }
258 ExecutionPayload::V2(payload) => {
259 let input = ExecutionPayloadInputV2 {
260 execution_payload: payload.payload_inner,
261 withdrawals: Some(payload.withdrawals),
262 };
263
264 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
265 }
266 ExecutionPayload::V1(payload) => {
267 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
268 }
269 ExecutionPayload::V4(payload) => {
270 let cancun = sidecar
271 .cancun()
272 .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V4 payload"))?;
273 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
274 let requests = sidecar.prague().map(|p| p.requests.clone()).unwrap_or_default();
275 (
276 version,
277 serde_json::to_value((
278 payload,
279 cancun.versioned_hashes.clone(),
280 cancun.parent_beacon_block_root,
281 requests,
282 ))?,
283 )
284 }
285 };
286
287 Ok((version, params, execution_data))
288}
289
290#[allow(dead_code)]
296pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
297 provider: P,
298 version: Option<EngineApiMessageVersion>,
299 params: serde_json::Value,
300) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
301 call_new_payload_with_reth(provider, version, params).await
302}
303
304#[derive(Debug, Deserialize)]
306struct RethPayloadStatus {
307 latency_us: u64,
308 #[serde(default)]
309 persistence_wait_us: u64,
310 #[serde(default)]
311 execution_cache_wait_us: u64,
312 #[serde(default)]
313 sparse_trie_wait_us: u64,
314}
315
316#[derive(Debug, Clone, Copy, Default)]
318pub(crate) struct NewPayloadTimingBreakdown {
319 pub(crate) latency: Duration,
321 pub(crate) persistence_wait: Duration,
323 pub(crate) execution_cache_wait: Duration,
325 pub(crate) sparse_trie_wait: Duration,
327}
328
329pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
337 provider: P,
338 version: Option<EngineApiMessageVersion>,
339 params: serde_json::Value,
340) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
341 let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
342
343 debug!(target: "reth-bench", method, "Sending newPayload");
344
345 let resp = loop {
346 let resp: serde_json::Value = provider.client().request(method, ¶ms).await?;
347 let status = PayloadStatus::deserialize(&resp)?;
348
349 if status.is_valid() {
350 break resp;
351 }
352 if status.is_invalid() {
353 return Err(eyre::eyre!("Invalid {method}: {status:?}"));
354 }
355 if status.is_syncing() {
356 return Err(eyre::eyre!(
357 "invalid range: no canonical state found for parent of requested block"
358 ));
359 }
360 };
361
362 if version.is_some() {
363 return Ok(None);
364 }
365
366 let resp: RethPayloadStatus = serde_json::from_value(resp)?;
367
368 Ok(Some(NewPayloadTimingBreakdown {
369 latency: Duration::from_micros(resp.latency_us),
370 persistence_wait: Duration::from_micros(resp.persistence_wait_us),
371 execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
372 sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
373 }))
374}
375
376pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
382 provider: P,
383 message_version: EngineApiMessageVersion,
384 forkchoice_state: ForkchoiceState,
385 payload_attributes: Option<PayloadAttributes>,
386) -> TransportResult<ForkchoiceUpdated> {
387 match message_version {
389 EngineApiMessageVersion::V3 |
390 EngineApiMessageVersion::V4 |
391 EngineApiMessageVersion::V5 |
392 EngineApiMessageVersion::V6 => {
393 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
394 }
395 EngineApiMessageVersion::V2 => {
396 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
397 }
398 EngineApiMessageVersion::V1 => {
399 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
400 }
401 }
402}
403
404pub(crate) async fn call_forkchoice_updated_with_reth<
410 N: Network,
411 P: Provider<N> + EngineApiValidWaitExt<N>,
412>(
413 provider: P,
414 message_version: Option<EngineApiMessageVersion>,
415 forkchoice_state: ForkchoiceState,
416) -> TransportResult<ForkchoiceUpdated> {
417 if let Some(message_version) = message_version {
418 call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
419 } else {
420 let method = "reth_forkchoiceUpdated";
421 let reth_params = serde_json::to_value((forkchoice_state,))
422 .expect("ForkchoiceState serialization cannot fail");
423
424 debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
425
426 loop {
427 let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
428
429 if resp.is_valid() {
430 break Ok(resp)
431 }
432
433 if resp.is_invalid() {
434 error!(target: "reth-bench", ?resp, "Invalid {method}");
435 return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
436 std::io::Error::other(format!("Invalid {method}: {resp:?}")),
437 )))
438 }
439 if resp.is_syncing() {
440 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
441 "invalid range: no canonical state found for parent of requested block",
442 ))
443 }
444 }
445 }
446}