Skip to main content

reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_consensus::TxEnvelope;
6use alloy_primitives::Bytes;
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9    ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
10    ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use reth_node_api::EngineApiMessageVersion;
14use reth_node_core::args::WaitForPersistence;
15use reth_rpc_api::RethNewPayloadInput;
16use serde::Deserialize;
17use std::time::Duration;
18use tracing::{debug, error};
19
20/// An extension trait for providers that implement the engine API, to wait for a VALID response.
21#[async_trait::async_trait]
22pub trait EngineApiValidWaitExt<N>: Send + Sync {
23    /// Calls `engine_forkChoiceUpdatedV1` with the given [`ForkchoiceState`] and optional
24    /// [`PayloadAttributes`], and waits until the response is VALID.
25    async fn fork_choice_updated_v1_wait(
26        &self,
27        fork_choice_state: ForkchoiceState,
28        payload_attributes: Option<PayloadAttributes>,
29    ) -> TransportResult<ForkchoiceUpdated>;
30
31    /// Calls `engine_forkChoiceUpdatedV2` with the given [`ForkchoiceState`] and optional
32    /// [`PayloadAttributes`], and waits until the response is VALID.
33    async fn fork_choice_updated_v2_wait(
34        &self,
35        fork_choice_state: ForkchoiceState,
36        payload_attributes: Option<PayloadAttributes>,
37    ) -> TransportResult<ForkchoiceUpdated>;
38
39    /// Calls `engine_forkChoiceUpdatedV3` with the given [`ForkchoiceState`] and optional
40    /// [`PayloadAttributes`], and waits until the response is VALID.
41    async fn fork_choice_updated_v3_wait(
42        &self,
43        fork_choice_state: ForkchoiceState,
44        payload_attributes: Option<PayloadAttributes>,
45    ) -> TransportResult<ForkchoiceUpdated>;
46}
47
48#[async_trait::async_trait]
49impl<N, P> EngineApiValidWaitExt<N> for P
50where
51    N: Network,
52    P: Provider<N> + EngineApi<N>,
53{
54    async fn fork_choice_updated_v1_wait(
55        &self,
56        fork_choice_state: ForkchoiceState,
57        payload_attributes: Option<PayloadAttributes>,
58    ) -> TransportResult<ForkchoiceUpdated> {
59        debug!(
60            target: "reth-bench",
61            method = "engine_forkchoiceUpdatedV1",
62            ?fork_choice_state,
63            ?payload_attributes,
64            "Sending forkchoiceUpdated"
65        );
66
67        let mut status =
68            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
69
70        while !status.is_valid() {
71            if status.is_invalid() {
72                error!(
73                    target: "reth-bench",
74                    ?status,
75                    ?fork_choice_state,
76                    ?payload_attributes,
77                    "Invalid forkchoiceUpdatedV1 message",
78                );
79                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
80            }
81            if status.is_syncing() {
82                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
83                    "invalid range: no canonical state found for parent of requested block",
84                ))
85            }
86            status =
87                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
88        }
89
90        Ok(status)
91    }
92
93    async fn fork_choice_updated_v2_wait(
94        &self,
95        fork_choice_state: ForkchoiceState,
96        payload_attributes: Option<PayloadAttributes>,
97    ) -> TransportResult<ForkchoiceUpdated> {
98        debug!(
99            target: "reth-bench",
100            method = "engine_forkchoiceUpdatedV2",
101            ?fork_choice_state,
102            ?payload_attributes,
103            "Sending forkchoiceUpdated"
104        );
105
106        let mut status =
107            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
108
109        while !status.is_valid() {
110            if status.is_invalid() {
111                error!(
112                    target: "reth-bench",
113                    ?status,
114                    ?fork_choice_state,
115                    ?payload_attributes,
116                    "Invalid forkchoiceUpdatedV2 message",
117                );
118                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
119            }
120            if status.is_syncing() {
121                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
122                    "invalid range: no canonical state found for parent of requested block",
123                ))
124            }
125            status =
126                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
127        }
128
129        Ok(status)
130    }
131
132    async fn fork_choice_updated_v3_wait(
133        &self,
134        fork_choice_state: ForkchoiceState,
135        payload_attributes: Option<PayloadAttributes>,
136    ) -> TransportResult<ForkchoiceUpdated> {
137        debug!(
138            target: "reth-bench",
139            method = "engine_forkchoiceUpdatedV3",
140            ?fork_choice_state,
141            ?payload_attributes,
142            "Sending forkchoiceUpdated"
143        );
144
145        let mut status =
146            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
147
148        while !status.is_valid() {
149            if status.is_invalid() {
150                error!(
151                    target: "reth-bench",
152                    ?status,
153                    ?fork_choice_state,
154                    ?payload_attributes,
155                    "Invalid forkchoiceUpdatedV3 message",
156                );
157                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
158            }
159            status =
160                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
161        }
162
163        Ok(status)
164    }
165}
166
167/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
168///
169/// Returns `(version, versioned_params, execution_data)`.
170///
171/// `wait_for_persistence` controls how `wait_for_persistence` is passed to
172/// `reth_newPayload` on a per-block basis.
173pub(crate) fn block_to_new_payload(
174    block: AnyRpcBlock,
175    rlp: Option<Bytes>,
176    reth_new_payload: bool,
177    wait_for_persistence: WaitForPersistence,
178    no_wait_for_caches: bool,
179) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
180    let block_number = block.header.number;
181    let wait_for_persistence = wait_for_persistence.rpc_value(block_number);
182
183    if let Some(rlp) = rlp {
184        return Ok((
185            None,
186            serde_json::to_value((
187                RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),
188                wait_for_persistence,
189                no_wait_for_caches.then_some(false),
190            ))?,
191        ));
192    }
193
194    let block = block
195        .into_inner()
196        .map_header(|header| header.map(|h| h.into_header_with_defaults()))
197        .try_map_transactions(|tx| -> eyre::Result<TxEnvelope> {
198            tx.try_into().map_err(|_| eyre::eyre!("unsupported tx type"))
199        })?
200        .into_consensus();
201    let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
202    let (version, params, execution_data) = payload_to_new_payload(payload, sidecar, None)?;
203
204    if reth_new_payload {
205        Ok((
206            None,
207            serde_json::to_value((
208                RethNewPayloadInput::ExecutionData(execution_data),
209                wait_for_persistence,
210                no_wait_for_caches.then_some(false),
211            ))?,
212        ))
213    } else {
214        Ok((Some(version), params))
215    }
216}
217
218/// Converts an execution payload and sidecar into versioned engine API params and an
219/// [`ExecutionData`].
220///
221/// Returns `(version, versioned_params, execution_data)`.
222pub(crate) fn payload_to_new_payload(
223    payload: ExecutionPayload,
224    sidecar: ExecutionPayloadSidecar,
225    target_version: Option<EngineApiMessageVersion>,
226) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
227    let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
228
229    let (version, params) = match payload {
230        ExecutionPayload::V3(payload) => {
231            let cancun = sidecar
232                .cancun()
233                .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V3 payload"))?;
234
235            if let Some(prague) = sidecar.prague() {
236                let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
237                let requests = prague.requests.clone();
238                (
239                    version,
240                    serde_json::to_value((
241                        payload,
242                        cancun.versioned_hashes.clone(),
243                        cancun.parent_beacon_block_root,
244                        requests,
245                    ))?,
246                )
247            } else {
248                (
249                    EngineApiMessageVersion::V3,
250                    serde_json::to_value((
251                        payload,
252                        cancun.versioned_hashes.clone(),
253                        cancun.parent_beacon_block_root,
254                    ))?,
255                )
256            }
257        }
258        ExecutionPayload::V2(payload) => {
259            let input = ExecutionPayloadInputV2 {
260                execution_payload: payload.payload_inner,
261                withdrawals: Some(payload.withdrawals),
262            };
263
264            (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
265        }
266        ExecutionPayload::V1(payload) => {
267            (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
268        }
269        ExecutionPayload::V4(payload) => {
270            let cancun = sidecar
271                .cancun()
272                .ok_or_else(|| eyre::eyre!("missing cancun sidecar for V4 payload"))?;
273            let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
274            let requests = sidecar.prague().map(|p| p.requests.clone()).unwrap_or_default();
275            (
276                version,
277                serde_json::to_value((
278                    payload,
279                    cancun.versioned_hashes.clone(),
280                    cancun.parent_beacon_block_root,
281                    requests,
282                ))?,
283            )
284        }
285    };
286
287    Ok((version, params, execution_data))
288}
289
290/// Calls the correct `engine_newPayload` method depending on the given execution payload and its
291/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
292///
293/// # Panics
294/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
295#[allow(dead_code)]
296pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
297    provider: P,
298    version: Option<EngineApiMessageVersion>,
299    params: serde_json::Value,
300) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
301    call_new_payload_with_reth(provider, version, params).await
302}
303
304/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
305#[derive(Debug, Deserialize)]
306struct RethPayloadStatus {
307    latency_us: u64,
308    #[serde(default)]
309    persistence_wait_us: u64,
310    #[serde(default)]
311    execution_cache_wait_us: u64,
312    #[serde(default)]
313    sparse_trie_wait_us: u64,
314}
315
316/// Server-side timing breakdown from `reth_newPayload` endpoint.
317#[derive(Debug, Clone, Copy, Default)]
318pub(crate) struct NewPayloadTimingBreakdown {
319    /// Server-side execution latency.
320    pub(crate) latency: Duration,
321    /// Time spent waiting on persistence (backpressure + explicit wait).
322    pub(crate) persistence_wait: Duration,
323    /// Time spent waiting for execution cache lock.
324    pub(crate) execution_cache_wait: Duration,
325    /// Time spent waiting for sparse trie lock.
326    pub(crate) sparse_trie_wait: Duration,
327}
328
329/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
330/// `version` is provided.
331///
332/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
333///
334/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
335/// the standard engine namespace.
336pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
337    provider: P,
338    version: Option<EngineApiMessageVersion>,
339    params: serde_json::Value,
340) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
341    let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
342
343    debug!(target: "reth-bench", method, "Sending newPayload");
344
345    let resp = loop {
346        let resp: serde_json::Value = provider.client().request(method, &params).await?;
347        let status = PayloadStatus::deserialize(&resp)?;
348
349        if status.is_valid() {
350            break resp;
351        }
352        if status.is_invalid() {
353            return Err(eyre::eyre!("Invalid {method}: {status:?}"));
354        }
355        if status.is_syncing() {
356            return Err(eyre::eyre!(
357                "invalid range: no canonical state found for parent of requested block"
358            ));
359        }
360    };
361
362    if version.is_some() {
363        return Ok(None);
364    }
365
366    let resp: RethPayloadStatus = serde_json::from_value(resp)?;
367
368    Ok(Some(NewPayloadTimingBreakdown {
369        latency: Duration::from_micros(resp.latency_us),
370        persistence_wait: Duration::from_micros(resp.persistence_wait_us),
371        execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
372        sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
373    }))
374}
375
376/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
377/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
378/// actual engine api message call.
379///
380/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
381pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
382    provider: P,
383    message_version: EngineApiMessageVersion,
384    forkchoice_state: ForkchoiceState,
385    payload_attributes: Option<PayloadAttributes>,
386) -> TransportResult<ForkchoiceUpdated> {
387    // FCU V3 is used for both Cancun and Prague (there is no FCU V4)
388    match message_version {
389        EngineApiMessageVersion::V3 |
390        EngineApiMessageVersion::V4 |
391        EngineApiMessageVersion::V5 |
392        EngineApiMessageVersion::V6 => {
393            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
394        }
395        EngineApiMessageVersion::V2 => {
396            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
397        }
398        EngineApiMessageVersion::V1 => {
399            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
400        }
401    }
402}
403
404/// Calls either `reth_forkchoiceUpdated` or the standard `engine_forkchoiceUpdated*` depending
405/// on `use_reth`.
406///
407/// When `use_reth` is true, uses the `reth_forkchoiceUpdated` endpoint which sends a regular FCU
408/// with no payload attributes.
409pub(crate) async fn call_forkchoice_updated_with_reth<
410    N: Network,
411    P: Provider<N> + EngineApiValidWaitExt<N>,
412>(
413    provider: P,
414    message_version: Option<EngineApiMessageVersion>,
415    forkchoice_state: ForkchoiceState,
416) -> TransportResult<ForkchoiceUpdated> {
417    if let Some(message_version) = message_version {
418        call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
419    } else {
420        let method = "reth_forkchoiceUpdated";
421        let reth_params = serde_json::to_value((forkchoice_state,))
422            .expect("ForkchoiceState serialization cannot fail");
423
424        debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
425
426        loop {
427            let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
428
429            if resp.is_valid() {
430                break Ok(resp)
431            }
432
433            if resp.is_invalid() {
434                error!(target: "reth-bench", ?resp, "Invalid {method}");
435                return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
436                    std::io::Error::other(format!("Invalid {method}: {resp:?}")),
437                )))
438            }
439            if resp.is_syncing() {
440                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
441                    "invalid range: no canonical state found for parent of requested block",
442                ))
443            }
444        }
445    }
446}