Skip to main content

reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_eips::eip7685::Requests;
6use alloy_primitives::{Bytes, B256};
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9    ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
10    ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use reth_rpc_api::RethNewPayloadInput;
16use serde::Deserialize;
17use std::time::Duration;
18use tracing::{debug, error};
19
20/// An extension trait for providers that implement the engine API, to wait for a VALID response.
21#[async_trait::async_trait]
22pub trait EngineApiValidWaitExt<N>: Send + Sync {
23    /// Calls `engine_forkChoiceUpdatedV1` with the given [`ForkchoiceState`] and optional
24    /// [`PayloadAttributes`], and waits until the response is VALID.
25    async fn fork_choice_updated_v1_wait(
26        &self,
27        fork_choice_state: ForkchoiceState,
28        payload_attributes: Option<PayloadAttributes>,
29    ) -> TransportResult<ForkchoiceUpdated>;
30
31    /// Calls `engine_forkChoiceUpdatedV2` with the given [`ForkchoiceState`] and optional
32    /// [`PayloadAttributes`], and waits until the response is VALID.
33    async fn fork_choice_updated_v2_wait(
34        &self,
35        fork_choice_state: ForkchoiceState,
36        payload_attributes: Option<PayloadAttributes>,
37    ) -> TransportResult<ForkchoiceUpdated>;
38
39    /// Calls `engine_forkChoiceUpdatedV3` with the given [`ForkchoiceState`] and optional
40    /// [`PayloadAttributes`], and waits until the response is VALID.
41    async fn fork_choice_updated_v3_wait(
42        &self,
43        fork_choice_state: ForkchoiceState,
44        payload_attributes: Option<PayloadAttributes>,
45    ) -> TransportResult<ForkchoiceUpdated>;
46}
47
48#[async_trait::async_trait]
49impl<N, P> EngineApiValidWaitExt<N> for P
50where
51    N: Network,
52    P: Provider<N> + EngineApi<N>,
53{
54    async fn fork_choice_updated_v1_wait(
55        &self,
56        fork_choice_state: ForkchoiceState,
57        payload_attributes: Option<PayloadAttributes>,
58    ) -> TransportResult<ForkchoiceUpdated> {
59        debug!(
60            target: "reth-bench",
61            method = "engine_forkchoiceUpdatedV1",
62            ?fork_choice_state,
63            ?payload_attributes,
64            "Sending forkchoiceUpdated"
65        );
66
67        let mut status =
68            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
69
70        while !status.is_valid() {
71            if status.is_invalid() {
72                error!(
73                    target: "reth-bench",
74                    ?status,
75                    ?fork_choice_state,
76                    ?payload_attributes,
77                    "Invalid forkchoiceUpdatedV1 message",
78                );
79                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
80            }
81            if status.is_syncing() {
82                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
83                    "invalid range: no canonical state found for parent of requested block",
84                ))
85            }
86            status =
87                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
88        }
89
90        Ok(status)
91    }
92
93    async fn fork_choice_updated_v2_wait(
94        &self,
95        fork_choice_state: ForkchoiceState,
96        payload_attributes: Option<PayloadAttributes>,
97    ) -> TransportResult<ForkchoiceUpdated> {
98        debug!(
99            target: "reth-bench",
100            method = "engine_forkchoiceUpdatedV2",
101            ?fork_choice_state,
102            ?payload_attributes,
103            "Sending forkchoiceUpdated"
104        );
105
106        let mut status =
107            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
108
109        while !status.is_valid() {
110            if status.is_invalid() {
111                error!(
112                    target: "reth-bench",
113                    ?status,
114                    ?fork_choice_state,
115                    ?payload_attributes,
116                    "Invalid forkchoiceUpdatedV2 message",
117                );
118                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
119            }
120            if status.is_syncing() {
121                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
122                    "invalid range: no canonical state found for parent of requested block",
123                ))
124            }
125            status =
126                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
127        }
128
129        Ok(status)
130    }
131
132    async fn fork_choice_updated_v3_wait(
133        &self,
134        fork_choice_state: ForkchoiceState,
135        payload_attributes: Option<PayloadAttributes>,
136    ) -> TransportResult<ForkchoiceUpdated> {
137        debug!(
138            target: "reth-bench",
139            method = "engine_forkchoiceUpdatedV3",
140            ?fork_choice_state,
141            ?payload_attributes,
142            "Sending forkchoiceUpdated"
143        );
144
145        let mut status =
146            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
147
148        while !status.is_valid() {
149            if status.is_invalid() {
150                error!(
151                    target: "reth-bench",
152                    ?status,
153                    ?fork_choice_state,
154                    ?payload_attributes,
155                    "Invalid forkchoiceUpdatedV3 message",
156                );
157                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
158            }
159            status =
160                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
161        }
162
163        Ok(status)
164    }
165}
166
167/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
168///
169/// Returns `(version, versioned_params, execution_data)`.
170pub(crate) fn block_to_new_payload(
171    block: AnyRpcBlock,
172    is_optimism: bool,
173    rlp: Option<Bytes>,
174    reth_new_payload: bool,
175) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
176    if let Some(rlp) = rlp {
177        return Ok((
178            None,
179            serde_json::to_value((RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),))?,
180        ));
181    }
182    let block = block
183        .into_inner()
184        .map_header(|header| header.map(|h| h.into_header_with_defaults()))
185        .try_map_transactions(|tx| {
186            // try to convert unknowns into op type so that we can also support optimism
187            tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
188        })?
189        .into_consensus();
190
191    // Convert to execution payload
192    let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
193    let (version, params, execution_data) =
194        payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
195
196    if reth_new_payload {
197        Ok((None, serde_json::to_value((RethNewPayloadInput::ExecutionData(execution_data),))?))
198    } else {
199        Ok((Some(version), params))
200    }
201}
202
203/// Converts an execution payload and sidecar into versioned engine API params and an
204/// [`ExecutionData`].
205///
206/// Returns `(version, versioned_params, execution_data)`.
207pub(crate) fn payload_to_new_payload(
208    payload: ExecutionPayload,
209    sidecar: ExecutionPayloadSidecar,
210    is_optimism: bool,
211    withdrawals_root: Option<B256>,
212    target_version: Option<EngineApiMessageVersion>,
213) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
214    let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
215
216    let (version, params) = match payload {
217        ExecutionPayload::V3(payload) => {
218            let cancun = sidecar.cancun().unwrap();
219
220            if let Some(prague) = sidecar.prague() {
221                // Use target version if provided (for Osaka), otherwise default to V4
222                let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
223
224                if is_optimism {
225                    let withdrawals_root = withdrawals_root.ok_or_else(|| {
226                        eyre::eyre!("Missing withdrawals root for Optimism payload")
227                    })?;
228                    (
229                        version,
230                        serde_json::to_value((
231                            OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
232                            cancun.versioned_hashes.clone(),
233                            cancun.parent_beacon_block_root,
234                            Requests::default(),
235                        ))?,
236                    )
237                } else {
238                    // Extract actual Requests from RequestsOrHash
239                    let requests = prague.requests.requests_hash();
240                    (
241                        version,
242                        serde_json::to_value((
243                            payload,
244                            cancun.versioned_hashes.clone(),
245                            cancun.parent_beacon_block_root,
246                            requests,
247                        ))?,
248                    )
249                }
250            } else {
251                (
252                    EngineApiMessageVersion::V3,
253                    serde_json::to_value((
254                        payload,
255                        cancun.versioned_hashes.clone(),
256                        cancun.parent_beacon_block_root,
257                    ))?,
258                )
259            }
260        }
261        ExecutionPayload::V2(payload) => {
262            let input = ExecutionPayloadInputV2 {
263                execution_payload: payload.payload_inner,
264                withdrawals: Some(payload.withdrawals),
265            };
266
267            (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
268        }
269        ExecutionPayload::V1(payload) => {
270            (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
271        }
272    };
273
274    Ok((version, params, execution_data))
275}
276
277/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
278/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
279///
280/// # Panics
281/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
282#[allow(dead_code)]
283pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
284    provider: P,
285    version: Option<EngineApiMessageVersion>,
286    params: serde_json::Value,
287) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
288    call_new_payload_with_reth(provider, version, params).await
289}
290
291/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
292#[derive(Debug, Deserialize)]
293struct RethPayloadStatus {
294    latency_us: u64,
295    #[serde(default)]
296    persistence_wait_us: Option<u64>,
297    #[serde(default)]
298    execution_cache_wait_us: u64,
299    #[serde(default)]
300    sparse_trie_wait_us: u64,
301}
302
303/// Server-side timing breakdown from `reth_newPayload` endpoint.
304#[derive(Debug, Clone, Copy, Default)]
305pub(crate) struct NewPayloadTimingBreakdown {
306    /// Server-side execution latency.
307    pub(crate) latency: Duration,
308    /// Time spent waiting for persistence. `None` when no persistence was in-flight.
309    pub(crate) persistence_wait: Option<Duration>,
310    /// Time spent waiting for execution cache lock.
311    pub(crate) execution_cache_wait: Duration,
312    /// Time spent waiting for sparse trie lock.
313    pub(crate) sparse_trie_wait: Duration,
314}
315
316/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
317/// `version` is provided.
318///
319/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
320///
321/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
322/// the standard engine namespace.
323pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
324    provider: P,
325    version: Option<EngineApiMessageVersion>,
326    params: serde_json::Value,
327) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
328    let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
329
330    debug!(target: "reth-bench", method, "Sending newPayload");
331
332    let resp = loop {
333        let resp: serde_json::Value = provider.client().request(method, &params).await?;
334        let status = PayloadStatus::deserialize(&resp)?;
335
336        if status.is_valid() {
337            break resp;
338        }
339        if status.is_invalid() {
340            return Err(eyre::eyre!("Invalid {method}: {status:?}"));
341        }
342        if status.is_syncing() {
343            return Err(eyre::eyre!(
344                "invalid range: no canonical state found for parent of requested block"
345            ));
346        }
347    };
348
349    if version.is_some() {
350        return Ok(None);
351    }
352
353    let resp: RethPayloadStatus = serde_json::from_value(resp)?;
354
355    Ok(Some(NewPayloadTimingBreakdown {
356        latency: Duration::from_micros(resp.latency_us),
357        persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
358        execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
359        sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
360    }))
361}
362
363/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
364/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
365/// actual engine api message call.
366///
367/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
368pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
369    provider: P,
370    message_version: EngineApiMessageVersion,
371    forkchoice_state: ForkchoiceState,
372    payload_attributes: Option<PayloadAttributes>,
373) -> TransportResult<ForkchoiceUpdated> {
374    // FCU V3 is used for both Cancun and Prague (there is no FCU V4)
375    match message_version {
376        EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
377            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
378        }
379        EngineApiMessageVersion::V2 => {
380            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
381        }
382        EngineApiMessageVersion::V1 => {
383            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
384        }
385    }
386}
387
388/// Calls either `reth_forkchoiceUpdated` or the standard `engine_forkchoiceUpdated*` depending
389/// on `use_reth`.
390///
391/// When `use_reth` is true, uses the `reth_forkchoiceUpdated` endpoint which sends a regular FCU
392/// with no payload attributes.
393pub(crate) async fn call_forkchoice_updated_with_reth<
394    N: Network,
395    P: Provider<N> + EngineApiValidWaitExt<N>,
396>(
397    provider: P,
398    message_version: Option<EngineApiMessageVersion>,
399    forkchoice_state: ForkchoiceState,
400) -> TransportResult<ForkchoiceUpdated> {
401    if let Some(message_version) = message_version {
402        call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
403    } else {
404        let method = "reth_forkchoiceUpdated";
405        let reth_params = serde_json::to_value((forkchoice_state,))
406            .expect("ForkchoiceState serialization cannot fail");
407
408        debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
409
410        loop {
411            let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
412
413            if resp.is_valid() {
414                break Ok(resp)
415            }
416
417            if resp.is_invalid() {
418                error!(target: "reth-bench", ?resp, "Invalid {method}");
419                return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
420                    std::io::Error::other(format!("Invalid {method}: {resp:?}")),
421                )))
422            }
423            if resp.is_syncing() {
424                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
425                    "invalid range: no canonical state found for parent of requested block",
426                ))
427            }
428        }
429    }
430}