1use alloy_eips::eip7685::RequestsOrHash;
6use alloy_primitives::B256;
7use alloy_provider::{ext::EngineApi, Network, Provider};
8use alloy_rpc_types_engine::{
9 ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
10 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use reth_node_api::EngineApiMessageVersion;
14use tracing::error;
15
16#[async_trait::async_trait]
18pub trait EngineApiValidWaitExt<N>: Send + Sync {
19 async fn new_payload_v1_wait(
22 &self,
23 payload: ExecutionPayloadV1,
24 ) -> TransportResult<PayloadStatus>;
25
26 async fn new_payload_v2_wait(
29 &self,
30 payload: ExecutionPayloadInputV2,
31 ) -> TransportResult<PayloadStatus>;
32
33 async fn new_payload_v3_wait(
36 &self,
37 payload: ExecutionPayloadV3,
38 versioned_hashes: Vec<B256>,
39 parent_beacon_block_root: B256,
40 ) -> TransportResult<PayloadStatus>;
41
42 async fn new_payload_v4_wait(
45 &self,
46 payload: ExecutionPayloadV3,
47 versioned_hashes: Vec<B256>,
48 parent_beacon_block_root: B256,
49 requests_hash: B256,
50 ) -> TransportResult<PayloadStatus>;
51
52 async fn fork_choice_updated_v1_wait(
55 &self,
56 fork_choice_state: ForkchoiceState,
57 payload_attributes: Option<PayloadAttributes>,
58 ) -> TransportResult<ForkchoiceUpdated>;
59
60 async fn fork_choice_updated_v2_wait(
63 &self,
64 fork_choice_state: ForkchoiceState,
65 payload_attributes: Option<PayloadAttributes>,
66 ) -> TransportResult<ForkchoiceUpdated>;
67
68 async fn fork_choice_updated_v3_wait(
71 &self,
72 fork_choice_state: ForkchoiceState,
73 payload_attributes: Option<PayloadAttributes>,
74 ) -> TransportResult<ForkchoiceUpdated>;
75}
76
77#[async_trait::async_trait]
78impl<N, P> EngineApiValidWaitExt<N> for P
79where
80 N: Network,
81 P: Provider<N> + EngineApi<N>,
82{
83 async fn new_payload_v1_wait(
84 &self,
85 payload: ExecutionPayloadV1,
86 ) -> TransportResult<PayloadStatus> {
87 let mut status = self.new_payload_v1(payload.clone()).await?;
88 while !status.is_valid() {
89 if status.is_invalid() {
90 error!(?status, ?payload, "Invalid newPayloadV1",);
91 panic!("Invalid newPayloadV1: {status:?}");
92 }
93 status = self.new_payload_v1(payload.clone()).await?;
94 }
95 Ok(status)
96 }
97
98 async fn new_payload_v2_wait(
99 &self,
100 payload: ExecutionPayloadInputV2,
101 ) -> TransportResult<PayloadStatus> {
102 let mut status = self.new_payload_v2(payload.clone()).await?;
103 while !status.is_valid() {
104 if status.is_invalid() {
105 error!(?status, ?payload, "Invalid newPayloadV2",);
106 panic!("Invalid newPayloadV2: {status:?}");
107 }
108 status = self.new_payload_v2(payload.clone()).await?;
109 }
110 Ok(status)
111 }
112
113 async fn new_payload_v3_wait(
114 &self,
115 payload: ExecutionPayloadV3,
116 versioned_hashes: Vec<B256>,
117 parent_beacon_block_root: B256,
118 ) -> TransportResult<PayloadStatus> {
119 let mut status = self
120 .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
121 .await?;
122 while !status.is_valid() {
123 if status.is_invalid() {
124 error!(
125 ?status,
126 ?payload,
127 ?versioned_hashes,
128 ?parent_beacon_block_root,
129 "Invalid newPayloadV3",
130 );
131 panic!("Invalid newPayloadV3: {status:?}");
132 }
133 if status.is_syncing() {
134 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
135 "invalid range: no canonical state found for parent of requested block",
136 ))
137 }
138 status = self
139 .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
140 .await?;
141 }
142 Ok(status)
143 }
144
145 async fn new_payload_v4_wait(
146 &self,
147 payload: ExecutionPayloadV3,
148 versioned_hashes: Vec<B256>,
149 parent_beacon_block_root: B256,
150 requests_hash: B256,
151 ) -> TransportResult<PayloadStatus> {
152 let mut status: PayloadStatus = self
156 .client()
157 .request(
158 "engine_newPayloadV4",
159 (
160 payload.clone(),
161 versioned_hashes.clone(),
162 parent_beacon_block_root,
163 RequestsOrHash::Hash(requests_hash),
164 ),
165 )
166 .await?;
167 while !status.is_valid() {
168 if status.is_invalid() {
169 error!(
170 ?status,
171 ?payload,
172 ?versioned_hashes,
173 ?parent_beacon_block_root,
174 "Invalid newPayloadV4",
175 );
176 panic!("Invalid newPayloadV4: {status:?}");
177 }
178 if status.is_syncing() {
179 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
180 "invalid range: no canonical state found for parent of requested block",
181 ))
182 }
183 status = self
184 .client()
185 .request(
186 "engine_newPayloadV4",
187 (
188 payload.clone(),
189 versioned_hashes.clone(),
190 parent_beacon_block_root,
191 RequestsOrHash::Hash(requests_hash),
192 ),
193 )
194 .await?;
195 }
196 Ok(status)
197 }
198
199 async fn fork_choice_updated_v1_wait(
200 &self,
201 fork_choice_state: ForkchoiceState,
202 payload_attributes: Option<PayloadAttributes>,
203 ) -> TransportResult<ForkchoiceUpdated> {
204 let mut status =
205 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
206
207 while !status.is_valid() {
208 if status.is_invalid() {
209 error!(
210 ?status,
211 ?fork_choice_state,
212 ?payload_attributes,
213 "Invalid forkchoiceUpdatedV1 message",
214 );
215 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
216 }
217 if status.is_syncing() {
218 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
219 "invalid range: no canonical state found for parent of requested block",
220 ))
221 }
222 status =
223 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
224 }
225
226 Ok(status)
227 }
228
229 async fn fork_choice_updated_v2_wait(
230 &self,
231 fork_choice_state: ForkchoiceState,
232 payload_attributes: Option<PayloadAttributes>,
233 ) -> TransportResult<ForkchoiceUpdated> {
234 let mut status =
235 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
236
237 while !status.is_valid() {
238 if status.is_invalid() {
239 error!(
240 ?status,
241 ?fork_choice_state,
242 ?payload_attributes,
243 "Invalid forkchoiceUpdatedV2 message",
244 );
245 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
246 }
247 if status.is_syncing() {
248 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
249 "invalid range: no canonical state found for parent of requested block",
250 ))
251 }
252 status =
253 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
254 }
255
256 Ok(status)
257 }
258
259 async fn fork_choice_updated_v3_wait(
260 &self,
261 fork_choice_state: ForkchoiceState,
262 payload_attributes: Option<PayloadAttributes>,
263 ) -> TransportResult<ForkchoiceUpdated> {
264 let mut status =
265 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
266
267 while !status.is_valid() {
268 if status.is_invalid() {
269 error!(
270 ?status,
271 ?fork_choice_state,
272 ?payload_attributes,
273 "Invalid forkchoiceUpdatedV3 message",
274 );
275 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
276 }
277 status =
278 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
279 }
280
281 Ok(status)
282 }
283}
284
285pub(crate) async fn call_new_payload<N, P: EngineApiValidWaitExt<N>>(
291 provider: P,
292 payload: ExecutionPayload,
293 sidecar: ExecutionPayloadSidecar,
294 parent_beacon_block_root: Option<B256>,
295 versioned_hashes: Vec<B256>,
296) -> TransportResult<EngineApiMessageVersion> {
297 match payload {
298 ExecutionPayload::V3(payload) => {
299 let parent_beacon_block_root = parent_beacon_block_root
301 .expect("parent_beacon_block_root is required for V3 payloads and higher");
302
303 if let Some(requests_hash) = sidecar.requests_hash() {
304 provider
305 .new_payload_v4_wait(
306 payload,
307 versioned_hashes,
308 parent_beacon_block_root,
309 requests_hash,
310 )
311 .await?;
312 Ok(EngineApiMessageVersion::V4)
313 } else {
314 provider
315 .new_payload_v3_wait(payload, versioned_hashes, parent_beacon_block_root)
316 .await?;
317 Ok(EngineApiMessageVersion::V3)
318 }
319 }
320 ExecutionPayload::V2(payload) => {
321 let input = ExecutionPayloadInputV2 {
322 execution_payload: payload.payload_inner,
323 withdrawals: Some(payload.withdrawals),
324 };
325
326 provider.new_payload_v2_wait(input).await?;
327
328 Ok(EngineApiMessageVersion::V2)
329 }
330 ExecutionPayload::V1(payload) => {
331 provider.new_payload_v1_wait(payload).await?;
332
333 Ok(EngineApiMessageVersion::V1)
334 }
335 }
336}
337
338pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
342 provider: P,
343 message_version: EngineApiMessageVersion,
344 forkchoice_state: ForkchoiceState,
345 payload_attributes: Option<PayloadAttributes>,
346) -> TransportResult<ForkchoiceUpdated> {
347 match message_version {
348 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 => {
349 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
350 }
351 EngineApiMessageVersion::V2 => {
352 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
353 }
354 EngineApiMessageVersion::V1 => {
355 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
356 }
357 }
358}