Skip to main content

reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_eips::eip7685::Requests;
6use alloy_primitives::B256;
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9    ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ForkchoiceState,
10    ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use tracing::{debug, error};
16
17/// An extension trait for providers that implement the engine API, to wait for a VALID response.
18#[async_trait::async_trait]
19pub trait EngineApiValidWaitExt<N>: Send + Sync {
20    /// Calls `engine_forkChoiceUpdatedV1` with the given [`ForkchoiceState`] and optional
21    /// [`PayloadAttributes`], and waits until the response is VALID.
22    async fn fork_choice_updated_v1_wait(
23        &self,
24        fork_choice_state: ForkchoiceState,
25        payload_attributes: Option<PayloadAttributes>,
26    ) -> TransportResult<ForkchoiceUpdated>;
27
28    /// Calls `engine_forkChoiceUpdatedV2` with the given [`ForkchoiceState`] and optional
29    /// [`PayloadAttributes`], and waits until the response is VALID.
30    async fn fork_choice_updated_v2_wait(
31        &self,
32        fork_choice_state: ForkchoiceState,
33        payload_attributes: Option<PayloadAttributes>,
34    ) -> TransportResult<ForkchoiceUpdated>;
35
36    /// Calls `engine_forkChoiceUpdatedV3` with the given [`ForkchoiceState`] and optional
37    /// [`PayloadAttributes`], and waits until the response is VALID.
38    async fn fork_choice_updated_v3_wait(
39        &self,
40        fork_choice_state: ForkchoiceState,
41        payload_attributes: Option<PayloadAttributes>,
42    ) -> TransportResult<ForkchoiceUpdated>;
43}
44
45#[async_trait::async_trait]
46impl<N, P> EngineApiValidWaitExt<N> for P
47where
48    N: Network,
49    P: Provider<N> + EngineApi<N>,
50{
51    async fn fork_choice_updated_v1_wait(
52        &self,
53        fork_choice_state: ForkchoiceState,
54        payload_attributes: Option<PayloadAttributes>,
55    ) -> TransportResult<ForkchoiceUpdated> {
56        debug!(
57            target: "reth-bench",
58            method = "engine_forkchoiceUpdatedV1",
59            ?fork_choice_state,
60            ?payload_attributes,
61            "Sending forkchoiceUpdated"
62        );
63
64        let mut status =
65            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
66
67        while !status.is_valid() {
68            if status.is_invalid() {
69                error!(
70                    target: "reth-bench",
71                    ?status,
72                    ?fork_choice_state,
73                    ?payload_attributes,
74                    "Invalid forkchoiceUpdatedV1 message",
75                );
76                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
77            }
78            if status.is_syncing() {
79                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
80                    "invalid range: no canonical state found for parent of requested block",
81                ))
82            }
83            status =
84                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
85        }
86
87        Ok(status)
88    }
89
90    async fn fork_choice_updated_v2_wait(
91        &self,
92        fork_choice_state: ForkchoiceState,
93        payload_attributes: Option<PayloadAttributes>,
94    ) -> TransportResult<ForkchoiceUpdated> {
95        debug!(
96            target: "reth-bench",
97            method = "engine_forkchoiceUpdatedV2",
98            ?fork_choice_state,
99            ?payload_attributes,
100            "Sending forkchoiceUpdated"
101        );
102
103        let mut status =
104            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
105
106        while !status.is_valid() {
107            if status.is_invalid() {
108                error!(
109                    target: "reth-bench",
110                    ?status,
111                    ?fork_choice_state,
112                    ?payload_attributes,
113                    "Invalid forkchoiceUpdatedV2 message",
114                );
115                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
116            }
117            if status.is_syncing() {
118                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
119                    "invalid range: no canonical state found for parent of requested block",
120                ))
121            }
122            status =
123                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
124        }
125
126        Ok(status)
127    }
128
129    async fn fork_choice_updated_v3_wait(
130        &self,
131        fork_choice_state: ForkchoiceState,
132        payload_attributes: Option<PayloadAttributes>,
133    ) -> TransportResult<ForkchoiceUpdated> {
134        debug!(
135            target: "reth-bench",
136            method = "engine_forkchoiceUpdatedV3",
137            ?fork_choice_state,
138            ?payload_attributes,
139            "Sending forkchoiceUpdated"
140        );
141
142        let mut status =
143            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
144
145        while !status.is_valid() {
146            if status.is_invalid() {
147                error!(
148                    target: "reth-bench",
149                    ?status,
150                    ?fork_choice_state,
151                    ?payload_attributes,
152                    "Invalid forkchoiceUpdatedV3 message",
153                );
154                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
155            }
156            status =
157                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
158        }
159
160        Ok(status)
161    }
162}
163
164pub(crate) fn block_to_new_payload(
165    block: AnyRpcBlock,
166    is_optimism: bool,
167) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
168    let block = block
169        .into_inner()
170        .map_header(|header| header.map(|h| h.into_header_with_defaults()))
171        .try_map_transactions(|tx| {
172            // try to convert unknowns into op type so that we can also support optimism
173            tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
174        })?
175        .into_consensus();
176
177    // Convert to execution payload
178    let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
179    payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)
180}
181
182pub(crate) fn payload_to_new_payload(
183    payload: ExecutionPayload,
184    sidecar: ExecutionPayloadSidecar,
185    is_optimism: bool,
186    withdrawals_root: Option<B256>,
187    target_version: Option<EngineApiMessageVersion>,
188) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value)> {
189    let (version, params) = match payload {
190        ExecutionPayload::V3(payload) => {
191            let cancun = sidecar.cancun().unwrap();
192
193            if let Some(prague) = sidecar.prague() {
194                // Use target version if provided (for Osaka), otherwise default to V4
195                let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
196
197                if is_optimism {
198                    let withdrawals_root = withdrawals_root.ok_or_else(|| {
199                        eyre::eyre!("Missing withdrawals root for Optimism payload")
200                    })?;
201                    (
202                        version,
203                        serde_json::to_value((
204                            OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
205                            cancun.versioned_hashes.clone(),
206                            cancun.parent_beacon_block_root,
207                            Requests::default(),
208                        ))?,
209                    )
210                } else {
211                    // Extract actual Requests from RequestsOrHash
212                    let requests = prague.requests.requests_hash();
213                    (
214                        version,
215                        serde_json::to_value((
216                            payload,
217                            cancun.versioned_hashes.clone(),
218                            cancun.parent_beacon_block_root,
219                            requests,
220                        ))?,
221                    )
222                }
223            } else {
224                (
225                    EngineApiMessageVersion::V3,
226                    serde_json::to_value((
227                        payload,
228                        cancun.versioned_hashes.clone(),
229                        cancun.parent_beacon_block_root,
230                    ))?,
231                )
232            }
233        }
234        ExecutionPayload::V2(payload) => {
235            let input = ExecutionPayloadInputV2 {
236                execution_payload: payload.payload_inner,
237                withdrawals: Some(payload.withdrawals),
238            };
239
240            (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
241        }
242        ExecutionPayload::V1(payload) => {
243            (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
244        }
245    };
246
247    Ok((version, params))
248}
249
250/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
251/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
252///
253/// # Panics
254/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
255pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
256    provider: P,
257    version: EngineApiMessageVersion,
258    params: serde_json::Value,
259) -> TransportResult<()> {
260    let method = version.method_name();
261
262    debug!(target: "reth-bench", method, "Sending newPayload");
263
264    let mut status: PayloadStatus = provider.client().request(method, &params).await?;
265
266    while !status.is_valid() {
267        if status.is_invalid() {
268            error!(target: "reth-bench", ?status, ?params, "Invalid {method}",);
269            return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(std::io::Error::other(
270                format!("Invalid {method}: {status:?}"),
271            ))))
272        }
273        if status.is_syncing() {
274            return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
275                "invalid range: no canonical state found for parent of requested block",
276            ))
277        }
278        status = provider.client().request(method, &params).await?;
279    }
280    Ok(())
281}
282
283/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
284/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
285/// actual engine api message call.
286///
287/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
288pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
289    provider: P,
290    message_version: EngineApiMessageVersion,
291    forkchoice_state: ForkchoiceState,
292    payload_attributes: Option<PayloadAttributes>,
293) -> TransportResult<ForkchoiceUpdated> {
294    // FCU V3 is used for both Cancun and Prague (there is no FCU V4)
295    match message_version {
296        EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
297            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
298        }
299        EngineApiMessageVersion::V2 => {
300            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
301        }
302        EngineApiMessageVersion::V1 => {
303            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
304        }
305    }
306}