Skip to main content

reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_consensus::TxEnvelope;
6use alloy_eips::eip7928::BlockAccessList;
7use alloy_primitives::Bytes;
8use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
9use alloy_rpc_types_engine::{
10    ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
11    ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
12};
13use alloy_transport::TransportResult;
14use reth_node_api::EngineApiMessageVersion;
15use reth_node_core::args::WaitForPersistence;
16use reth_rpc_api::RethNewPayloadInput;
17use serde::Deserialize;
18use std::time::Duration;
19use tracing::{debug, error};
20
21/// An extension trait for providers that implement the engine API, to wait for a VALID response.
22#[async_trait::async_trait]
23pub trait EngineApiValidWaitExt<N>: Send + Sync {
24    /// Calls `engine_forkChoiceUpdatedV1` with the given [`ForkchoiceState`] and optional
25    /// [`PayloadAttributes`], and waits until the response is VALID.
26    async fn fork_choice_updated_v1_wait(
27        &self,
28        fork_choice_state: ForkchoiceState,
29        payload_attributes: Option<PayloadAttributes>,
30    ) -> TransportResult<ForkchoiceUpdated>;
31
32    /// Calls `engine_forkChoiceUpdatedV2` with the given [`ForkchoiceState`] and optional
33    /// [`PayloadAttributes`], and waits until the response is VALID.
34    async fn fork_choice_updated_v2_wait(
35        &self,
36        fork_choice_state: ForkchoiceState,
37        payload_attributes: Option<PayloadAttributes>,
38    ) -> TransportResult<ForkchoiceUpdated>;
39
40    /// Calls `engine_forkChoiceUpdatedV3` with the given [`ForkchoiceState`] and optional
41    /// [`PayloadAttributes`], and waits until the response is VALID.
42    async fn fork_choice_updated_v3_wait(
43        &self,
44        fork_choice_state: ForkchoiceState,
45        payload_attributes: Option<PayloadAttributes>,
46    ) -> TransportResult<ForkchoiceUpdated>;
47
48    /// Calls `engine_forkChoiceUpdatedV4` with the given [`ForkchoiceState`] and optional
49    /// [`PayloadAttributes`], and waits until the response is VALID.
50    async fn fork_choice_updated_v4_wait(
51        &self,
52        fork_choice_state: ForkchoiceState,
53        payload_attributes: Option<PayloadAttributes>,
54    ) -> TransportResult<ForkchoiceUpdated>;
55}
56
57#[async_trait::async_trait]
58impl<N, P> EngineApiValidWaitExt<N> for P
59where
60    N: Network,
61    P: Provider<N> + EngineApi<N>,
62{
63    async fn fork_choice_updated_v1_wait(
64        &self,
65        fork_choice_state: ForkchoiceState,
66        payload_attributes: Option<PayloadAttributes>,
67    ) -> TransportResult<ForkchoiceUpdated> {
68        debug!(
69            target: "reth-bench",
70            method = "engine_forkchoiceUpdatedV1",
71            ?fork_choice_state,
72            ?payload_attributes,
73            "Sending forkchoiceUpdated"
74        );
75
76        let mut status =
77            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
78
79        while !status.is_valid() {
80            if status.is_invalid() {
81                error!(
82                    target: "reth-bench",
83                    ?status,
84                    ?fork_choice_state,
85                    ?payload_attributes,
86                    "Invalid forkchoiceUpdatedV1 message",
87                );
88                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
89            }
90            if status.is_syncing() {
91                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
92                    "invalid range: no canonical state found for parent of requested block",
93                ))
94            }
95            status =
96                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
97        }
98
99        Ok(status)
100    }
101
102    async fn fork_choice_updated_v2_wait(
103        &self,
104        fork_choice_state: ForkchoiceState,
105        payload_attributes: Option<PayloadAttributes>,
106    ) -> TransportResult<ForkchoiceUpdated> {
107        debug!(
108            target: "reth-bench",
109            method = "engine_forkchoiceUpdatedV2",
110            ?fork_choice_state,
111            ?payload_attributes,
112            "Sending forkchoiceUpdated"
113        );
114
115        let mut status =
116            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
117
118        while !status.is_valid() {
119            if status.is_invalid() {
120                error!(
121                    target: "reth-bench",
122                    ?status,
123                    ?fork_choice_state,
124                    ?payload_attributes,
125                    "Invalid forkchoiceUpdatedV2 message",
126                );
127                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
128            }
129            if status.is_syncing() {
130                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
131                    "invalid range: no canonical state found for parent of requested block",
132                ))
133            }
134            status =
135                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
136        }
137
138        Ok(status)
139    }
140
141    async fn fork_choice_updated_v3_wait(
142        &self,
143        fork_choice_state: ForkchoiceState,
144        payload_attributes: Option<PayloadAttributes>,
145    ) -> TransportResult<ForkchoiceUpdated> {
146        debug!(
147            target: "reth-bench",
148            method = "engine_forkchoiceUpdatedV3",
149            ?fork_choice_state,
150            ?payload_attributes,
151            "Sending forkchoiceUpdated"
152        );
153
154        let mut status =
155            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
156
157        while !status.is_valid() {
158            if status.is_invalid() {
159                error!(
160                    target: "reth-bench",
161                    ?status,
162                    ?fork_choice_state,
163                    ?payload_attributes,
164                    "Invalid forkchoiceUpdatedV3 message",
165                );
166                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
167            }
168            status =
169                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
170        }
171
172        Ok(status)
173    }
174
175    async fn fork_choice_updated_v4_wait(
176        &self,
177        fork_choice_state: ForkchoiceState,
178        payload_attributes: Option<PayloadAttributes>,
179    ) -> TransportResult<ForkchoiceUpdated> {
180        debug!(
181            target: "reth-bench",
182            method = "engine_forkchoiceUpdatedV3",
183            ?fork_choice_state,
184            ?payload_attributes,
185            "Sending forkchoiceUpdated"
186        );
187
188        let mut status =
189            self.fork_choice_updated_v4(fork_choice_state, payload_attributes.clone()).await?;
190
191        while !status.is_valid() {
192            if status.is_invalid() {
193                error!(
194                    target: "reth-bench",
195                    ?status,
196                    ?fork_choice_state,
197                    ?payload_attributes,
198                    "Invalid forkchoiceUpdatedV4 message",
199                );
200                panic!("Invalid forkchoiceUpdatedV4: {status:?}");
201            }
202            status =
203                self.fork_choice_updated_v4(fork_choice_state, payload_attributes.clone()).await?;
204        }
205
206        Ok(status)
207    }
208}
209
210/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
211///
212/// Returns `(version, versioned_params, execution_data)`.
213///
214/// `wait_for_persistence` controls how `wait_for_persistence` is passed to
215/// `reth_newPayload` on a per-block basis.
216pub(crate) fn block_to_new_payload(
217    block: AnyRpcBlock,
218    rlp: Option<Bytes>,
219    reth_new_payload: bool,
220    wait_for_persistence: WaitForPersistence,
221    no_wait_for_caches: bool,
222    bal: Option<BlockAccessList>,
223) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
224    let block_number = block.header.number;
225    let wait_for_persistence = wait_for_persistence.rpc_value(block_number);
226
227    if let Some(rlp) = rlp {
228        let bal = bal.map(|bal| alloy_rlp::encode(bal).into());
229        return Ok((
230            None,
231            serde_json::to_value((
232                RethNewPayloadInput::<ExecutionData>::BlockRlp { block: rlp, bal },
233                wait_for_persistence,
234                no_wait_for_caches.then_some(false),
235            ))?,
236        ));
237    }
238
239    let block = block
240        .into_inner()
241        .map_header(|header| header.map(|h| h.into_header_with_defaults()))
242        .try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
243            tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
244        })?
245        .into_consensus();
246
247    let block_access_list = alloy_rlp::encode(bal.unwrap_or_default());
248
249    let (payload, sidecar) =
250        ExecutionPayload::from_block_slow_with_bal(&block, block_access_list.into());
251    let (version, params, execution_data) = payload_to_new_payload(payload, sidecar, None)?;
252
253    if reth_new_payload {
254        Ok((
255            None,
256            serde_json::to_value((
257                RethNewPayloadInput::ExecutionData(execution_data),
258                wait_for_persistence,
259                no_wait_for_caches.then_some(false),
260            ))?,
261        ))
262    } else {
263        Ok((Some(version), params))
264    }
265}
266
267/// Converts an execution payload and sidecar into versioned engine API params and an
268/// [`ExecutionData`].
269///
270/// Returns `(version, versioned_params, execution_data)`.
271pub(crate) fn payload_to_new_payload(
272    payload: ExecutionPayload,
273    sidecar: ExecutionPayloadSidecar,
274    target_version: Option<EngineApiMessageVersion>,
275) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
276    let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
277
278    let (version, params) = match payload {
279        ExecutionPayload::V4(payload) => {
280            let cancun = sidecar
281                .cancun()
282                .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V4 payload"))?;
283            let version = target_version.unwrap_or(EngineApiMessageVersion::V6);
284            let requests = sidecar.prague().map(|p| p.requests.clone()).unwrap_or_default();
285            (
286                version,
287                serde_json::to_value((
288                    payload,
289                    cancun.versioned_hashes.clone(),
290                    cancun.parent_beacon_block_root,
291                    requests,
292                ))?,
293            )
294        }
295        ExecutionPayload::V3(payload) => {
296            let cancun = sidecar
297                .cancun()
298                .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V3 payload"))?;
299
300            if let Some(prague) = sidecar.prague() {
301                let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
302                let requests = prague.requests.clone();
303                (
304                    version,
305                    serde_json::to_value((
306                        payload,
307                        cancun.versioned_hashes.clone(),
308                        cancun.parent_beacon_block_root,
309                        requests,
310                    ))?,
311                )
312            } else {
313                (
314                    EngineApiMessageVersion::V3,
315                    serde_json::to_value((
316                        payload,
317                        cancun.versioned_hashes.clone(),
318                        cancun.parent_beacon_block_root,
319                    ))?,
320                )
321            }
322        }
323        ExecutionPayload::V2(payload) => {
324            let input = ExecutionPayloadInputV2 {
325                execution_payload: payload.payload_inner,
326                withdrawals: Some(payload.withdrawals),
327            };
328
329            (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
330        }
331        ExecutionPayload::V1(payload) => {
332            (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
333        }
334    };
335
336    Ok((version, params, execution_data))
337}
338
339/// Calls the correct `engine_newPayload` method depending on the given execution payload and its
340/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
341///
342/// # Panics
343/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
344#[allow(dead_code)]
345pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
346    provider: P,
347    version: Option<EngineApiMessageVersion>,
348    params: serde_json::Value,
349) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
350    call_new_payload_with_reth(provider, version, params).await
351}
352
353/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
354#[derive(Debug, Deserialize)]
355struct RethPayloadStatus {
356    latency_us: u64,
357    #[serde(default)]
358    persistence_wait_us: u64,
359    #[serde(default)]
360    execution_cache_wait_us: u64,
361    #[serde(default)]
362    sparse_trie_wait_us: u64,
363}
364
365/// Server-side timing breakdown from `reth_newPayload` endpoint.
366#[derive(Debug, Clone, Copy, Default)]
367pub(crate) struct NewPayloadTimingBreakdown {
368    /// Server-side execution latency.
369    pub(crate) latency: Duration,
370    /// Time spent waiting on persistence (backpressure + explicit wait).
371    pub(crate) persistence_wait: Duration,
372    /// Time spent waiting for execution cache lock.
373    pub(crate) execution_cache_wait: Duration,
374    /// Time spent waiting for sparse trie lock.
375    pub(crate) sparse_trie_wait: Duration,
376}
377
378/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
379/// `version` is provided.
380///
381/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
382///
383/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
384/// the standard engine namespace.
385pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
386    provider: P,
387    version: Option<EngineApiMessageVersion>,
388    params: serde_json::Value,
389) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
390    let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
391
392    debug!(target: "reth-bench", method, "Sending newPayload");
393
394    let resp = loop {
395        let resp: serde_json::Value = provider.client().request(method, &params).await?;
396        let status = PayloadStatus::deserialize(&resp)?;
397
398        if status.is_valid() {
399            break resp;
400        }
401        if status.is_invalid() {
402            return Err(eyre::eyre!("Invalid {method}: {status:?}"));
403        }
404        if status.is_syncing() {
405            return Err(eyre::eyre!(
406                "invalid range: no canonical state found for parent of requested block"
407            ));
408        }
409    };
410
411    if version.is_some() {
412        return Ok(None);
413    }
414
415    let resp: RethPayloadStatus = serde_json::from_value(resp)?;
416
417    Ok(Some(NewPayloadTimingBreakdown {
418        latency: Duration::from_micros(resp.latency_us),
419        persistence_wait: Duration::from_micros(resp.persistence_wait_us),
420        execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
421        sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
422    }))
423}
424
425/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
426/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
427/// actual engine api message call.
428///
429/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
430pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
431    provider: P,
432    message_version: EngineApiMessageVersion,
433    forkchoice_state: ForkchoiceState,
434    payload_attributes: Option<PayloadAttributes>,
435) -> TransportResult<ForkchoiceUpdated> {
436    // FCU V3 is used for both Cancun and Prague (there is no FCU V4)
437    match message_version {
438        EngineApiMessageVersion::V6 => {
439            provider.fork_choice_updated_v4_wait(forkchoice_state, payload_attributes).await
440        }
441        EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
442            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
443        }
444        EngineApiMessageVersion::V2 => {
445            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
446        }
447        EngineApiMessageVersion::V1 => {
448            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
449        }
450    }
451}
452
453/// Calls either `reth_forkchoiceUpdated` or the standard `engine_forkchoiceUpdated*` depending
454/// on `use_reth`.
455///
456/// When `use_reth` is true, uses the `reth_forkchoiceUpdated` endpoint which sends a regular FCU
457/// with no payload attributes.
458pub(crate) async fn call_forkchoice_updated_with_reth<
459    N: Network,
460    P: Provider<N> + EngineApiValidWaitExt<N>,
461>(
462    provider: P,
463    message_version: Option<EngineApiMessageVersion>,
464    forkchoice_state: ForkchoiceState,
465) -> TransportResult<ForkchoiceUpdated> {
466    if let Some(message_version) = message_version {
467        call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
468    } else {
469        let method = "reth_forkchoiceUpdated";
470        let reth_params = serde_json::to_value((forkchoice_state,))
471            .expect("ForkchoiceState serialization cannot fail");
472
473        debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
474
475        loop {
476            let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
477
478            if resp.is_valid() {
479                break Ok(resp)
480            }
481
482            if resp.is_invalid() {
483                error!(target: "reth-bench", ?resp, "Invalid {method}");
484                return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
485                    std::io::Error::other(format!("Invalid {method}: {resp:?}")),
486                )))
487            }
488            if resp.is_syncing() {
489                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
490                    "invalid range: no canonical state found for parent of requested block",
491                ))
492            }
493        }
494    }
495}