1use alloy_eips::eip7685::Requests;
6use alloy_primitives::B256;
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9 ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
10 ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use tracing::{debug, error};
16
17#[async_trait::async_trait]
19pub trait EngineApiValidWaitExt<N>: Send + Sync {
20 async fn fork_choice_updated_v1_wait(
23 &self,
24 fork_choice_state: ForkchoiceState,
25 payload_attributes: Option<PayloadAttributes>,
26 ) -> TransportResult<ForkchoiceUpdated>;
27
28 async fn fork_choice_updated_v2_wait(
31 &self,
32 fork_choice_state: ForkchoiceState,
33 payload_attributes: Option<PayloadAttributes>,
34 ) -> TransportResult<ForkchoiceUpdated>;
35
36 async fn fork_choice_updated_v3_wait(
39 &self,
40 fork_choice_state: ForkchoiceState,
41 payload_attributes: Option<PayloadAttributes>,
42 ) -> TransportResult<ForkchoiceUpdated>;
43}
44
45#[async_trait::async_trait]
46impl<N, P> EngineApiValidWaitExt<N> for P
47where
48 N: Network,
49 P: Provider<N> + EngineApi<N>,
50{
51 async fn fork_choice_updated_v1_wait(
52 &self,
53 fork_choice_state: ForkchoiceState,
54 payload_attributes: Option<PayloadAttributes>,
55 ) -> TransportResult<ForkchoiceUpdated> {
56 debug!(
57 target: "reth-bench",
58 method = "engine_forkchoiceUpdatedV1",
59 ?fork_choice_state,
60 ?payload_attributes,
61 "Sending forkchoiceUpdated"
62 );
63
64 let mut status =
65 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
66
67 while !status.is_valid() {
68 if status.is_invalid() {
69 error!(
70 target: "reth-bench",
71 ?status,
72 ?fork_choice_state,
73 ?payload_attributes,
74 "Invalid forkchoiceUpdatedV1 message",
75 );
76 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
77 }
78 if status.is_syncing() {
79 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
80 "invalid range: no canonical state found for parent of requested block",
81 ))
82 }
83 status =
84 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
85 }
86
87 Ok(status)
88 }
89
90 async fn fork_choice_updated_v2_wait(
91 &self,
92 fork_choice_state: ForkchoiceState,
93 payload_attributes: Option<PayloadAttributes>,
94 ) -> TransportResult<ForkchoiceUpdated> {
95 debug!(
96 target: "reth-bench",
97 method = "engine_forkchoiceUpdatedV2",
98 ?fork_choice_state,
99 ?payload_attributes,
100 "Sending forkchoiceUpdated"
101 );
102
103 let mut status =
104 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
105
106 while !status.is_valid() {
107 if status.is_invalid() {
108 error!(
109 target: "reth-bench",
110 ?status,
111 ?fork_choice_state,
112 ?payload_attributes,
113 "Invalid forkchoiceUpdatedV2 message",
114 );
115 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
116 }
117 if status.is_syncing() {
118 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
119 "invalid range: no canonical state found for parent of requested block",
120 ))
121 }
122 status =
123 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
124 }
125
126 Ok(status)
127 }
128
129 async fn fork_choice_updated_v3_wait(
130 &self,
131 fork_choice_state: ForkchoiceState,
132 payload_attributes: Option<PayloadAttributes>,
133 ) -> TransportResult<ForkchoiceUpdated> {
134 debug!(
135 target: "reth-bench",
136 method = "engine_forkchoiceUpdatedV3",
137 ?fork_choice_state,
138 ?payload_attributes,
139 "Sending forkchoiceUpdated"
140 );
141
142 let mut status =
143 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
144
145 while !status.is_valid() {
146 if status.is_invalid() {
147 error!(
148 target: "reth-bench",
149 ?status,
150 ?fork_choice_state,
151 ?payload_attributes,
152 "Invalid forkchoiceUpdatedV3 message",
153 );
154 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
155 }
156 status =
157 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
158 }
159
160 Ok(status)
161 }
162}
163
164pub(crate) fn block_to_new_payload(
165 block: AnyRpcBlock,
166 is_optimism: bool,
167) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
168 let block = block
169 .into_inner()
170 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
171 .try_map_transactions(|tx| {
172 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
174 })?
175 .into_consensus();
176
177 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
179 payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
180}
181
182pub(crate) fn payload_to_new_payload(
183 payload: ExecutionPayload,
184 sidecar: ExecutionPayloadSidecar,
185 is_optimism: bool,
186 withdrawals_root: Option<B256>,
187 target_version: Option<EngineApiMessageVersion>,
188) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
189 let (version, params) = match payload {
190 ExecutionPayload::V3(payload) => {
191 let cancun = sidecar.cancun().unwrap();
192
193 if let Some(prague) = sidecar.prague() {
194 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
196
197 if is_optimism {
198 let withdrawals_root = withdrawals_root.ok_or_else(|| {
199 eyre::eyre!("Missing withdrawals root for Optimism payload")
200 })?;
201 (
202 version,
203 serde_json::to_value((
204 OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
205 cancun.versioned_hashes.clone(),
206 cancun.parent_beacon_block_root,
207 Requests::default(),
208 ))?,
209 )
210 } else {
211 let requests = prague.requests.requests_hash();
213 (
214 version,
215 serde_json::to_value((
216 payload,
217 cancun.versioned_hashes.clone(),
218 cancun.parent_beacon_block_root,
219 requests,
220 ))?,
221 )
222 }
223 } else {
224 (
225 EngineApiMessageVersion::V3,
226 serde_json::to_value((
227 payload,
228 cancun.versioned_hashes.clone(),
229 cancun.parent_beacon_block_root,
230 ))?,
231 )
232 }
233 }
234 ExecutionPayload::V2(payload) => {
235 let input = ExecutionPayloadInputV2 {
236 execution_payload: payload.payload_inner,
237 withdrawals: Some(payload.withdrawals),
238 };
239
240 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
241 }
242 ExecutionPayload::V1(payload) => {
243 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
244 }
245 };
246
247 Ok((version, params))
248}
249
250pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
256 provider: P,
257 version: EngineApiMessageVersion,
258 params: serde_json::Value,
259) -> TransportResult<()> {
260 let method = version.method_name();
261
262 debug!(target: "reth-bench", method, "Sending newPayload");
263
264 let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?;
265
266 while !status.is_valid() {
267 if status.is_invalid() {
268 error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
269 return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
270 format!("Invalid {method}: {status:?}"),
271 ))))
272 }
273 if status.is_syncing() {
274 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
275 "invalid range: no canonical state found for parent of requested block",
276 ))
277 }
278 status = provider.client().request(method, ¶ms).await?;
279 }
280 Ok(())
281}
282
283pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
289 provider: P,
290 message_version: EngineApiMessageVersion,
291 forkchoice_state: ForkchoiceState,
292 payload_attributes: Option<PayloadAttributes>,
293) -> TransportResult<ForkchoiceUpdated> {
294 match message_version {
296 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
297 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
298 }
299 EngineApiMessageVersion::V2 => {
300 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
301 }
302 EngineApiMessageVersion::V1 => {
303 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
304 }
305 }
306}