1use alloy_eips::eip7685::Requests;
6use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
7use alloy_rpc_types_engine::{
8 ExecutionPayload, ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated,
9 PayloadAttributes, PayloadStatus,
10};
11use alloy_transport::TransportResult;
12use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
13use reth_node_api::EngineApiMessageVersion;
14use tracing::error;
15
16#[async_trait::async_trait]
18pub trait EngineApiValidWaitExt<N>: Send + Sync {
19 async fn fork_choice_updated_v1_wait(
22 &self,
23 fork_choice_state: ForkchoiceState,
24 payload_attributes: Option<PayloadAttributes>,
25 ) -> TransportResult<ForkchoiceUpdated>;
26
27 async fn fork_choice_updated_v2_wait(
30 &self,
31 fork_choice_state: ForkchoiceState,
32 payload_attributes: Option<PayloadAttributes>,
33 ) -> TransportResult<ForkchoiceUpdated>;
34
35 async fn fork_choice_updated_v3_wait(
38 &self,
39 fork_choice_state: ForkchoiceState,
40 payload_attributes: Option<PayloadAttributes>,
41 ) -> TransportResult<ForkchoiceUpdated>;
42}
43
44#[async_trait::async_trait]
45impl<N, P> EngineApiValidWaitExt<N> for P
46where
47 N: Network,
48 P: Provider<N> + EngineApi<N>,
49{
50 async fn fork_choice_updated_v1_wait(
51 &self,
52 fork_choice_state: ForkchoiceState,
53 payload_attributes: Option<PayloadAttributes>,
54 ) -> TransportResult<ForkchoiceUpdated> {
55 let mut status =
56 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
57
58 while !status.is_valid() {
59 if status.is_invalid() {
60 error!(
61 ?status,
62 ?fork_choice_state,
63 ?payload_attributes,
64 "Invalid forkchoiceUpdatedV1 message",
65 );
66 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
67 }
68 if status.is_syncing() {
69 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
70 "invalid range: no canonical state found for parent of requested block",
71 ))
72 }
73 status =
74 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
75 }
76
77 Ok(status)
78 }
79
80 async fn fork_choice_updated_v2_wait(
81 &self,
82 fork_choice_state: ForkchoiceState,
83 payload_attributes: Option<PayloadAttributes>,
84 ) -> TransportResult<ForkchoiceUpdated> {
85 let mut status =
86 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
87
88 while !status.is_valid() {
89 if status.is_invalid() {
90 error!(
91 ?status,
92 ?fork_choice_state,
93 ?payload_attributes,
94 "Invalid forkchoiceUpdatedV2 message",
95 );
96 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
97 }
98 if status.is_syncing() {
99 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
100 "invalid range: no canonical state found for parent of requested block",
101 ))
102 }
103 status =
104 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
105 }
106
107 Ok(status)
108 }
109
110 async fn fork_choice_updated_v3_wait(
111 &self,
112 fork_choice_state: ForkchoiceState,
113 payload_attributes: Option<PayloadAttributes>,
114 ) -> TransportResult<ForkchoiceUpdated> {
115 let mut status =
116 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
117
118 while !status.is_valid() {
119 if status.is_invalid() {
120 error!(
121 ?status,
122 ?fork_choice_state,
123 ?payload_attributes,
124 "Invalid forkchoiceUpdatedV3 message",
125 );
126 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
127 }
128 status =
129 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
130 }
131
132 Ok(status)
133 }
134}
135
136pub(crate) fn block_to_new_payload(
137 block: AnyRpcBlock,
138 is_optimism: bool,
139) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
140 let block = block
141 .into_inner()
142 .map_header(|header| header.map(|h| h.into_header_with_defaults()))
143 .try_map_transactions(|tx| {
144 tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
146 })?
147 .into_consensus();
148
149 let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
151
152 let (version, params) = match payload {
153 ExecutionPayload::V3(payload) => {
154 let cancun = sidecar.cancun().unwrap();
155
156 if let Some(prague) = sidecar.prague() {
157 if is_optimism {
158 (
159 EngineApiMessageVersion::V4,
160 serde_json::to_value((
161 OpExecutionPayloadV4 {
162 payload_inner: payload,
163 withdrawals_root: block.withdrawals_root.unwrap(),
164 },
165 cancun.versioned_hashes.clone(),
166 cancun.parent_beacon_block_root,
167 Requests::default(),
168 ))?,
169 )
170 } else {
171 (
172 EngineApiMessageVersion::V4,
173 serde_json::to_value((
174 payload,
175 cancun.versioned_hashes.clone(),
176 cancun.parent_beacon_block_root,
177 prague.requests.requests_hash(),
178 ))?,
179 )
180 }
181 } else {
182 (
183 EngineApiMessageVersion::V3,
184 serde_json::to_value((
185 payload,
186 cancun.versioned_hashes.clone(),
187 cancun.parent_beacon_block_root,
188 ))?,
189 )
190 }
191 }
192 ExecutionPayload::V2(payload) => {
193 let input = ExecutionPayloadInputV2 {
194 execution_payload: payload.payload_inner,
195 withdrawals: Some(payload.withdrawals),
196 };
197
198 (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
199 }
200 ExecutionPayload::V1(payload) => {
201 (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
202 }
203 };
204
205 Ok((version, params))
206}
207
208pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
214 provider: P,
215 version: EngineApiMessageVersion,
216 params: serde_json::Value,
217) -> TransportResult<()> {
218 let method = version.method_name();
219
220 let mut status: PayloadStatus = provider.client().request(method, ¶ms).await?;
221
222 while !status.is_valid() {
223 if status.is_invalid() {
224 error!(?status, ?params, "Invalid {method}",);
225 panic!("Invalid {method}: {status:?}");
226 }
227 if status.is_syncing() {
228 return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
229 "invalid range: no canonical state found for parent of requested block",
230 ))
231 }
232 status = provider.client().request(method, ¶ms).await?;
233 }
234 Ok(())
235}
236
237pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
241 provider: P,
242 message_version: EngineApiMessageVersion,
243 forkchoice_state: ForkchoiceState,
244 payload_attributes: Option<PayloadAttributes>,
245) -> TransportResult<ForkchoiceUpdated> {
246 match message_version {
247 EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
248 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
249 }
250 EngineApiMessageVersion::V2 => {
251 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
252 }
253 EngineApiMessageVersion::V1 => {
254 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
255 }
256 }
257}