reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_eips::eip7685::Requests;
6use alloy_primitives::B256;
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9    ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
10    ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use tracing::{debug, error};
16
17/// An extension trait for providers that implement the engine API, to wait for a VALID response.
18#[async_trait::async_trait]
19pub trait EngineApiValidWaitExt<N>: Send + Sync {
20    /// Calls `engine_forkChoiceUpdatedV1` with the given [`ForkchoiceState`] and optional
21    /// [`PayloadAttributes`], and waits until the response is VALID.
22    async fn fork_choice_updated_v1_wait(
23        &self,
24        fork_choice_state: ForkchoiceState,
25        payload_attributes: Option<PayloadAttributes>,
26    ) -> TransportResult<ForkchoiceUpdated>;
27
28    /// Calls `engine_forkChoiceUpdatedV2` with the given [`ForkchoiceState`] and optional
29    /// [`PayloadAttributes`], and waits until the response is VALID.
30    async fn fork_choice_updated_v2_wait(
31        &self,
32        fork_choice_state: ForkchoiceState,
33        payload_attributes: Option<PayloadAttributes>,
34    ) -> TransportResult<ForkchoiceUpdated>;
35
36    /// Calls `engine_forkChoiceUpdatedV3` with the given [`ForkchoiceState`] and optional
37    /// [`PayloadAttributes`], and waits until the response is VALID.
38    async fn fork_choice_updated_v3_wait(
39        &self,
40        fork_choice_state: ForkchoiceState,
41        payload_attributes: Option<PayloadAttributes>,
42    ) -> TransportResult<ForkchoiceUpdated>;
43}
44
45#[async_trait::async_trait]
46impl<N, P> EngineApiValidWaitExt<N> for P
47where
48    N: Network,
49    P: Provider<N> + EngineApi<N>,
50{
51    async fn fork_choice_updated_v1_wait(
52        &self,
53        fork_choice_state: ForkchoiceState,
54        payload_attributes: Option<PayloadAttributes>,
55    ) -> TransportResult<ForkchoiceUpdated> {
56        debug!(
57            method = "engine_forkchoiceUpdatedV1",
58            ?fork_choice_state,
59            ?payload_attributes,
60            "Sending forkchoiceUpdated"
61        );
62
63        let mut status =
64            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
65
66        while !status.is_valid() {
67            if status.is_invalid() {
68                error!(
69                    ?status,
70                    ?fork_choice_state,
71                    ?payload_attributes,
72                    "Invalid forkchoiceUpdatedV1 message",
73                );
74                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
75            }
76            if status.is_syncing() {
77                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
78                    "invalid range: no canonical state found for parent of requested block",
79                ))
80            }
81            status =
82                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
83        }
84
85        Ok(status)
86    }
87
88    async fn fork_choice_updated_v2_wait(
89        &self,
90        fork_choice_state: ForkchoiceState,
91        payload_attributes: Option<PayloadAttributes>,
92    ) -> TransportResult<ForkchoiceUpdated> {
93        debug!(
94            method = "engine_forkchoiceUpdatedV2",
95            ?fork_choice_state,
96            ?payload_attributes,
97            "Sending forkchoiceUpdated"
98        );
99
100        let mut status =
101            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
102
103        while !status.is_valid() {
104            if status.is_invalid() {
105                error!(
106                    ?status,
107                    ?fork_choice_state,
108                    ?payload_attributes,
109                    "Invalid forkchoiceUpdatedV2 message",
110                );
111                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
112            }
113            if status.is_syncing() {
114                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
115                    "invalid range: no canonical state found for parent of requested block",
116                ))
117            }
118            status =
119                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
120        }
121
122        Ok(status)
123    }
124
125    async fn fork_choice_updated_v3_wait(
126        &self,
127        fork_choice_state: ForkchoiceState,
128        payload_attributes: Option<PayloadAttributes>,
129    ) -> TransportResult<ForkchoiceUpdated> {
130        debug!(
131            method = "engine_forkchoiceUpdatedV3",
132            ?fork_choice_state,
133            ?payload_attributes,
134            "Sending forkchoiceUpdated"
135        );
136
137        let mut status =
138            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
139
140        while !status.is_valid() {
141            if status.is_invalid() {
142                error!(
143                    ?status,
144                    ?fork_choice_state,
145                    ?payload_attributes,
146                    "Invalid forkchoiceUpdatedV3 message",
147                );
148                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
149            }
150            status =
151                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
152        }
153
154        Ok(status)
155    }
156}
157
158pub(crate) fn block_to_new_payload(
159    block: AnyRpcBlock,
160    is_optimism: bool,
161) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
162    let block = block
163        .into_inner()
164        .map_header(|header| header.map(|h| h.into_header_with_defaults()))
165        .try_map_transactions(|tx| {
166            // try to convert unknowns into op type so that we can also support optimism
167            tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
168        })?
169        .into_consensus();
170
171    // Convert to execution payload
172    let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
173    payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
174}
175
176pub(crate) fn payload_to_new_payload(
177    payload: ExecutionPayload,
178    sidecar: ExecutionPayloadSidecar,
179    is_optimism: bool,
180    withdrawals_root: Option<B256>,
181    target_version: Option<EngineApiMessageVersion>,
182) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
183    let (version, params) = match payload {
184        ExecutionPayload::V3(payload) => {
185            let cancun = sidecar.cancun().unwrap();
186
187            if let Some(prague) = sidecar.prague() {
188                // Use target version if provided (for Osaka), otherwise default to V4
189                let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
190
191                if is_optimism {
192                    let withdrawals_root = withdrawals_root.ok_or_else(|| {
193                        eyre::eyre!("Missing withdrawals root for Optimism payload")
194                    })?;
195                    (
196                        version,
197                        serde_json::to_value((
198                            OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
199                            cancun.versioned_hashes.clone(),
200                            cancun.parent_beacon_block_root,
201                            Requests::default(),
202                        ))?,
203                    )
204                } else {
205                    // Extract actual Requests from RequestsOrHash
206                    let requests = prague.requests.requests_hash();
207                    (
208                        version,
209                        serde_json::to_value((
210                            payload,
211                            cancun.versioned_hashes.clone(),
212                            cancun.parent_beacon_block_root,
213                            requests,
214                        ))?,
215                    )
216                }
217            } else {
218                (
219                    EngineApiMessageVersion::V3,
220                    serde_json::to_value((
221                        payload,
222                        cancun.versioned_hashes.clone(),
223                        cancun.parent_beacon_block_root,
224                    ))?,
225                )
226            }
227        }
228        ExecutionPayload::V2(payload) => {
229            let input = ExecutionPayloadInputV2 {
230                execution_payload: payload.payload_inner,
231                withdrawals: Some(payload.withdrawals),
232            };
233
234            (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
235        }
236        ExecutionPayload::V1(payload) => {
237            (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
238        }
239    };
240
241    Ok((version, params))
242}
243
244/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
245/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
246///
247/// # Panics
248/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
249pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
250    provider: P,
251    version: EngineApiMessageVersion,
252    params: serde_json::Value,
253) -> TransportResult<()> {
254    let method = version.method_name();
255
256    debug!(method, "Sending newPayload");
257
258    let mut status: PayloadStatus = provider.client().request(method, &params).await?;
259
260    while !status.is_valid() {
261        if status.is_invalid() {
262            error!(?status, ?params, "Invalid {method}",);
263            panic!("Invalid {method}: {status:?}");
264        }
265        if status.is_syncing() {
266            return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
267                "invalid range: no canonical state found for parent of requested block",
268            ))
269        }
270        status = provider.client().request(method, &params).await?;
271    }
272    Ok(())
273}
274
275/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
276/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
277/// actual engine api message call.
278///
279/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
280pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
281    provider: P,
282    message_version: EngineApiMessageVersion,
283    forkchoice_state: ForkchoiceState,
284    payload_attributes: Option<PayloadAttributes>,
285) -> TransportResult<ForkchoiceUpdated> {
286    // FCU V3 is used for both Cancun and Prague (there is no FCU V4)
287    match message_version {
288        EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
289            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
290        }
291        EngineApiMessageVersion::V2 => {
292            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
293        }
294        EngineApiMessageVersion::V1 => {
295            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
296        }
297    }
298}