1use alloy_eips::eip7685::Requests;
6use alloy_primitives::B256;
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9 ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
10 ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use tracing::{debug, error};
16
17#[async_trait::async_trait]
19pub trait EngineApiValidWaitExt<N>: Send + Sync {
20 async fn fork_choice_updated_v1_wait(
23 &self,
24 fork_choice_state: ForkchoiceState,
25 payload_attributes: Option<PayloadAttributes>,
26 ) -> TransportResult<ForkchoiceUpdated>;
27
28 async fn fork_choice_updated_v2_wait(
31 &self,
32 fork_choice_state: ForkchoiceState,
33 payload_attributes: Option<PayloadAttributes>,
34 ) -> TransportResult<ForkchoiceUpdated>;
35
36 async fn fork_choice_updated_v3_wait(
39 &self,
40 fork_choice_state: ForkchoiceState,
41 payload_attributes: Option<PayloadAttributes>,
42 ) -> TransportResult<ForkchoiceUpdated>;
43}
44
45#[async_trait::async_trait]
46impl<N, P> EngineApiValidWaitExt<N> for P
47where
48 N: Network,
49 P: Provider<N> + EngineApi<N>,
50{
51 async fn fork_choice_updated_v1_wait(
52 &self,
53 fork_choice_state: ForkchoiceState,
54 payload_attributes: Option<PayloadAttributes>,
55 ) -> TransportResult<ForkchoiceUpdated> {
56 debug!(
57 method = "engine_forkchoiceUpdatedV1",
58 ?fork_choice_state,
59 ?payload_attributes,
60 "Sending forkchoiceUpdated"
61 );
62
63 let mut status =
64 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
65
66 while !status.is_valid() {
67 if status.is_invalid() {
68 error!(
69 ?status,
70 ?fork_choice_state,
71 ?payload_attributes,
72 "Invalid forkchoiceUpdatedV1 message",
73 );
74 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
75 }
76 if status.is_syncing() {
77 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
78 "invalid range: no canonical state found for parent of requested block",
79 ))
80 }
81 status =
82 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
83 }
84
85 Ok(status)
86 }
87
88 async fn fork_choice_updated_v2_wait(
89 &self,
90 fork_choice_state: ForkchoiceState,
91 payload_attributes: Option<PayloadAttributes>,
92 ) -> TransportResult<ForkchoiceUpdated> {
93 debug!(
94 method = "engine_forkchoiceUpdatedV2",
95 ?fork_choice_state,
96 ?payload_attributes,
97 "Sending forkchoiceUpdated"
98 );
99
100 let mut status =
101 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
102
103 while !status.is_valid() {
104 if status.is_invalid() {
105 error!(
106 ?status,
107 ?fork_choice_state,
108 ?payload_attributes,
109 "Invalid forkchoiceUpdatedV2 message",
110 );
111 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
112 }
113 if status.is_syncing() {
114 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
115 "invalid range: no canonical state found for parent of requested block",
116 ))
117 }
118 status =
119 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
120 }
121
122 Ok(status)
123 }
124
125 async fn fork_choice_updated_v3_wait(
126 &self,
127 fork_choice_state: ForkchoiceState,
128 payload_attributes: Option<PayloadAttributes>,
129 ) -> TransportResult<ForkchoiceUpdated> {
130 debug!(
131 method = "engine_forkchoiceUpdatedV3",
132 ?fork_choice_state,
133 ?payload_attributes,
134 "Sending forkchoiceUpdated"
135 );
136
137 let mut status =
138 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
139
140 while !status.is_valid() {
141 if status.is_invalid() {
142 error!(
143 ?status,
144 ?fork_choice_state,
145 ?payload_attributes,
146 "Invalid forkchoiceUpdatedV3 message",
147 );
148 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
149 }
150 status =
151 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
152 }
153
154 Ok(status)
155 }
156}
157
158pub(crate) fn block_to_new_payload(
159 block: AnyRpcBlock,
160 is_optimism: bool,
161) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
162 let block = block
163 .into_inner()
164 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
165 .try_map_transactions(|tx| {
166 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
168 })?
169 .into_consensus();
170
171 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
173 payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
174}
175
176pub(crate) fn payload_to_new_payload(
177 payload: ExecutionPayload,
178 sidecar: ExecutionPayloadSidecar,
179 is_optimism: bool,
180 withdrawals_root: Option<B256>,
181 target_version: Option<EngineApiMessageVersion>,
182) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
183 let (version, params) = match payload {
184 ExecutionPayload::V3(payload) => {
185 let cancun = sidecar.cancun().unwrap();
186
187 if let Some(prague) = sidecar.prague() {
188 let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
190
191 if is_optimism {
192 let withdrawals_root = withdrawals_root.ok_or_else(|| {
193 eyre::eyre!("Missing withdrawals root for Optimism payload")
194 })?;
195 (
196 version,
197 serde_json::to_value((
198 OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
199 cancun.versioned_hashes.clone(),
200 cancun.parent_beacon_block_root,
201 Requests::default(),
202 ))?,
203 )
204 } else {
205 let requests = prague.requests.requests_hash();
207 (
208 version,
209 serde_json::to_value((
210 payload,
211 cancun.versioned_hashes.clone(),
212 cancun.parent_beacon_block_root,
213 requests,
214 ))?,
215 )
216 }
217 } else {
218 (
219 EngineApiMessageVersion::V3,
220 serde_json::to_value((
221 payload,
222 cancun.versioned_hashes.clone(),
223 cancun.parent_beacon_block_root,
224 ))?,
225 )
226 }
227 }
228 ExecutionPayload::V2(payload) => {
229 let input = ExecutionPayloadInputV2 {
230 execution_payload: payload.payload_inner,
231 withdrawals: Some(payload.withdrawals),
232 };
233
234 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
235 }
236 ExecutionPayload::V1(payload) => {
237 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
238 }
239 };
240
241 Ok((version, params))
242}
243
244pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
250 provider: P,
251 version: EngineApiMessageVersion,
252 params: serde_json::Value,
253) -> TransportResult<()> {
254 let method = version.method_name();
255
256 debug!(method, "Sending newPayload");
257
258 let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?;
259
260 while !status.is_valid() {
261 if status.is_invalid() {
262 error!(?status, ?params, "Invalid {method}",);
263 panic!("Invalid {method}: {status:?}");
264 }
265 if status.is_syncing() {
266 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
267 "invalid range: no canonical state found for parent of requested block",
268 ))
269 }
270 status = provider.client().request(method, ¶ms).await?;
271 }
272 Ok(())
273}
274
275pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
281 provider: P,
282 message_version: EngineApiMessageVersion,
283 forkchoice_state: ForkchoiceState,
284 payload_attributes: Option<PayloadAttributes>,
285) -> TransportResult<ForkchoiceUpdated> {
286 match message_version {
288 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
289 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
290 }
291 EngineApiMessageVersion::V2 => {
292 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
293 }
294 EngineApiMessageVersion::V1 => {
295 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
296 }
297 }
298}