1use alloy_primitives::B256;
6use alloy_provider::{ext::EngineApi, Network};
7use alloy_rpc_types_engine::{
8 ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
9 ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
10};
11use alloy_transport::TransportResult;
12use reth_node_api::EngineApiMessageVersion;
13use tracing::error;
14
15#[async_trait::async_trait]
17pub trait EngineApiValidWaitExt<N>: Send + Sync {
18 async fn new_payload_v1_wait(
21 &self,
22 payload: ExecutionPayloadV1,
23 ) -> TransportResult<PayloadStatus>;
24
25 async fn new_payload_v2_wait(
28 &self,
29 payload: ExecutionPayloadInputV2,
30 ) -> TransportResult<PayloadStatus>;
31
32 async fn new_payload_v3_wait(
35 &self,
36 payload: ExecutionPayloadV3,
37 versioned_hashes: Vec<B256>,
38 parent_beacon_block_root: B256,
39 ) -> TransportResult<PayloadStatus>;
40
41 async fn fork_choice_updated_v1_wait(
44 &self,
45 fork_choice_state: ForkchoiceState,
46 payload_attributes: Option<PayloadAttributes>,
47 ) -> TransportResult<ForkchoiceUpdated>;
48
49 async fn fork_choice_updated_v2_wait(
52 &self,
53 fork_choice_state: ForkchoiceState,
54 payload_attributes: Option<PayloadAttributes>,
55 ) -> TransportResult<ForkchoiceUpdated>;
56
57 async fn fork_choice_updated_v3_wait(
60 &self,
61 fork_choice_state: ForkchoiceState,
62 payload_attributes: Option<PayloadAttributes>,
63 ) -> TransportResult<ForkchoiceUpdated>;
64}
65
66#[async_trait::async_trait]
67impl<N, P> EngineApiValidWaitExt<N> for P
68where
69 N: Network,
70 P: EngineApi<N>,
71{
72 async fn new_payload_v1_wait(
73 &self,
74 payload: ExecutionPayloadV1,
75 ) -> TransportResult<PayloadStatus> {
76 let mut status = self.new_payload_v1(payload.clone()).await?;
77 while !status.is_valid() {
78 if status.is_invalid() {
79 error!(?status, ?payload, "Invalid newPayloadV1",);
80 panic!("Invalid newPayloadV1: {status:?}");
81 }
82 status = self.new_payload_v1(payload.clone()).await?;
83 }
84 Ok(status)
85 }
86
87 async fn new_payload_v2_wait(
88 &self,
89 payload: ExecutionPayloadInputV2,
90 ) -> TransportResult<PayloadStatus> {
91 let mut status = self.new_payload_v2(payload.clone()).await?;
92 while !status.is_valid() {
93 if status.is_invalid() {
94 error!(?status, ?payload, "Invalid newPayloadV2",);
95 panic!("Invalid newPayloadV2: {status:?}");
96 }
97 status = self.new_payload_v2(payload.clone()).await?;
98 }
99 Ok(status)
100 }
101
102 async fn new_payload_v3_wait(
103 &self,
104 payload: ExecutionPayloadV3,
105 versioned_hashes: Vec<B256>,
106 parent_beacon_block_root: B256,
107 ) -> TransportResult<PayloadStatus> {
108 let mut status = self
109 .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
110 .await?;
111 while !status.is_valid() {
112 if status.is_invalid() {
113 error!(
114 ?status,
115 ?payload,
116 ?versioned_hashes,
117 ?parent_beacon_block_root,
118 "Invalid newPayloadV3",
119 );
120 panic!("Invalid newPayloadV3: {status:?}");
121 }
122 if status.is_syncing() {
123 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
124 "invalid range: no canonical state found for parent of requested block",
125 ))
126 }
127 status = self
128 .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
129 .await?;
130 }
131 Ok(status)
132 }
133
134 async fn fork_choice_updated_v1_wait(
135 &self,
136 fork_choice_state: ForkchoiceState,
137 payload_attributes: Option<PayloadAttributes>,
138 ) -> TransportResult<ForkchoiceUpdated> {
139 let mut status =
140 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
141
142 while !status.is_valid() {
143 if status.is_invalid() {
144 error!(
145 ?status,
146 ?fork_choice_state,
147 ?payload_attributes,
148 "Invalid forkchoiceUpdatedV1 message",
149 );
150 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
151 }
152 if status.is_syncing() {
153 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
154 "invalid range: no canonical state found for parent of requested block",
155 ))
156 }
157 status =
158 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
159 }
160
161 Ok(status)
162 }
163
164 async fn fork_choice_updated_v2_wait(
165 &self,
166 fork_choice_state: ForkchoiceState,
167 payload_attributes: Option<PayloadAttributes>,
168 ) -> TransportResult<ForkchoiceUpdated> {
169 let mut status =
170 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
171
172 while !status.is_valid() {
173 if status.is_invalid() {
174 error!(
175 ?status,
176 ?fork_choice_state,
177 ?payload_attributes,
178 "Invalid forkchoiceUpdatedV2 message",
179 );
180 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
181 }
182 if status.is_syncing() {
183 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
184 "invalid range: no canonical state found for parent of requested block",
185 ))
186 }
187 status =
188 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
189 }
190
191 Ok(status)
192 }
193
194 async fn fork_choice_updated_v3_wait(
195 &self,
196 fork_choice_state: ForkchoiceState,
197 payload_attributes: Option<PayloadAttributes>,
198 ) -> TransportResult<ForkchoiceUpdated> {
199 let mut status =
200 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
201
202 while !status.is_valid() {
203 if status.is_invalid() {
204 error!(
205 ?status,
206 ?fork_choice_state,
207 ?payload_attributes,
208 "Invalid forkchoiceUpdatedV3 message",
209 );
210 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
211 }
212 status =
213 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
214 }
215
216 Ok(status)
217 }
218}
219
220pub(crate) async fn call_new_payload<N, P: EngineApiValidWaitExt<N>>(
226 provider: P,
227 payload: ExecutionPayload,
228 parent_beacon_block_root: Option<B256>,
229 versioned_hashes: Vec<B256>,
230) -> TransportResult<EngineApiMessageVersion> {
231 match payload {
232 ExecutionPayload::V3(payload) => {
233 let parent_beacon_block_root = parent_beacon_block_root
235 .expect("parent_beacon_block_root is required for V3 payloads");
236 provider
237 .new_payload_v3_wait(payload, versioned_hashes, parent_beacon_block_root)
238 .await?;
239
240 Ok(EngineApiMessageVersion::V3)
241 }
242 ExecutionPayload::V2(payload) => {
243 let input = ExecutionPayloadInputV2 {
244 execution_payload: payload.payload_inner,
245 withdrawals: Some(payload.withdrawals),
246 };
247
248 provider.new_payload_v2_wait(input).await?;
249
250 Ok(EngineApiMessageVersion::V2)
251 }
252 ExecutionPayload::V1(payload) => {
253 provider.new_payload_v1_wait(payload).await?;
254
255 Ok(EngineApiMessageVersion::V1)
256 }
257 }
258}
259
260pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
264 provider: P,
265 message_version: EngineApiMessageVersion,
266 forkchoice_state: ForkchoiceState,
267 payload_attributes: Option<PayloadAttributes>,
268) -> TransportResult<ForkchoiceUpdated> {
269 match message_version {
270 EngineApiMessageVersion::V4 => todo!("V4 payloads not supported yet"),
271 EngineApiMessageVersion::V3 => {
272 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
273 }
274 EngineApiMessageVersion::V2 => {
275 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
276 }
277 EngineApiMessageVersion::V1 => {
278 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
279 }
280 }
281}