Skip to main content

reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_eips::eip7685::Requests;
6use alloy_primitives::{Bytes, B256};
7use alloy_provider::{ext::EngineApi, network::AnyRpcBlock, Network, Provider};
8use alloy_rpc_types_engine::{
9    ExecutionData, ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadSidecar,
10    ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
11};
12use alloy_transport::TransportResult;
13use op_alloy_rpc_types_engine::OpExecutionPayloadV4;
14use reth_node_api::EngineApiMessageVersion;
15use reth_node_core::args::WaitForPersistence;
16use reth_rpc_api::RethNewPayloadInput;
17use serde::Deserialize;
18use std::time::Duration;
19use tracing::{debug, error};
20
21/// An extension trait for providers that implement the engine API, to wait for a VALID response.
22#[async_trait::async_trait]
23pub trait EngineApiValidWaitExt<N>: Send + Sync {
24    /// Calls `engine_forkChoiceUpdatedV1` with the given [`ForkchoiceState`] and optional
25    /// [`PayloadAttributes`], and waits until the response is VALID.
26    async fn fork_choice_updated_v1_wait(
27        &self,
28        fork_choice_state: ForkchoiceState,
29        payload_attributes: Option<PayloadAttributes>,
30    ) -> TransportResult<ForkchoiceUpdated>;
31
32    /// Calls `engine_forkChoiceUpdatedV2` with the given [`ForkchoiceState`] and optional
33    /// [`PayloadAttributes`], and waits until the response is VALID.
34    async fn fork_choice_updated_v2_wait(
35        &self,
36        fork_choice_state: ForkchoiceState,
37        payload_attributes: Option<PayloadAttributes>,
38    ) -> TransportResult<ForkchoiceUpdated>;
39
40    /// Calls `engine_forkChoiceUpdatedV3` with the given [`ForkchoiceState`] and optional
41    /// [`PayloadAttributes`], and waits until the response is VALID.
42    async fn fork_choice_updated_v3_wait(
43        &self,
44        fork_choice_state: ForkchoiceState,
45        payload_attributes: Option<PayloadAttributes>,
46    ) -> TransportResult<ForkchoiceUpdated>;
47}
48
49#[async_trait::async_trait]
50impl<N, P> EngineApiValidWaitExt<N> for P
51where
52    N: Network,
53    P: Provider<N> + EngineApi<N>,
54{
55    async fn fork_choice_updated_v1_wait(
56        &self,
57        fork_choice_state: ForkchoiceState,
58        payload_attributes: Option<PayloadAttributes>,
59    ) -> TransportResult<ForkchoiceUpdated> {
60        debug!(
61            target: "reth-bench",
62            method = "engine_forkchoiceUpdatedV1",
63            ?fork_choice_state,
64            ?payload_attributes,
65            "Sending forkchoiceUpdated"
66        );
67
68        let mut status =
69            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
70
71        while !status.is_valid() {
72            if status.is_invalid() {
73                error!(
74                    target: "reth-bench",
75                    ?status,
76                    ?fork_choice_state,
77                    ?payload_attributes,
78                    "Invalid forkchoiceUpdatedV1 message",
79                );
80                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
81            }
82            if status.is_syncing() {
83                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
84                    "invalid range: no canonical state found for parent of requested block",
85                ))
86            }
87            status =
88                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
89        }
90
91        Ok(status)
92    }
93
94    async fn fork_choice_updated_v2_wait(
95        &self,
96        fork_choice_state: ForkchoiceState,
97        payload_attributes: Option<PayloadAttributes>,
98    ) -> TransportResult<ForkchoiceUpdated> {
99        debug!(
100            target: "reth-bench",
101            method = "engine_forkchoiceUpdatedV2",
102            ?fork_choice_state,
103            ?payload_attributes,
104            "Sending forkchoiceUpdated"
105        );
106
107        let mut status =
108            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
109
110        while !status.is_valid() {
111            if status.is_invalid() {
112                error!(
113                    target: "reth-bench",
114                    ?status,
115                    ?fork_choice_state,
116                    ?payload_attributes,
117                    "Invalid forkchoiceUpdatedV2 message",
118                );
119                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
120            }
121            if status.is_syncing() {
122                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
123                    "invalid range: no canonical state found for parent of requested block",
124                ))
125            }
126            status =
127                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
128        }
129
130        Ok(status)
131    }
132
133    async fn fork_choice_updated_v3_wait(
134        &self,
135        fork_choice_state: ForkchoiceState,
136        payload_attributes: Option<PayloadAttributes>,
137    ) -> TransportResult<ForkchoiceUpdated> {
138        debug!(
139            target: "reth-bench",
140            method = "engine_forkchoiceUpdatedV3",
141            ?fork_choice_state,
142            ?payload_attributes,
143            "Sending forkchoiceUpdated"
144        );
145
146        let mut status =
147            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
148
149        while !status.is_valid() {
150            if status.is_invalid() {
151                error!(
152                    target: "reth-bench",
153                    ?status,
154                    ?fork_choice_state,
155                    ?payload_attributes,
156                    "Invalid forkchoiceUpdatedV3 message",
157                );
158                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
159            }
160            status =
161                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
162        }
163
164        Ok(status)
165    }
166}
167
168/// Converts an RPC block into versioned engine API params and an [`ExecutionData`].
169///
170/// Returns `(version, versioned_params, execution_data)`.
171///
172/// `wait_for_persistence` controls how `wait_for_persistence` is passed to
173/// `reth_newPayload` on a per-block basis.
174pub(crate) fn block_to_new_payload(
175    block: AnyRpcBlock,
176    is_optimism: bool,
177    rlp: Option<Bytes>,
178    reth_new_payload: bool,
179    wait_for_persistence: WaitForPersistence,
180    no_wait_for_caches: bool,
181) -> eyre::Result<(Option<EngineApiMessageVersion>, serde_json::Value)> {
182    let block_number = block.header.number;
183    let wait_for_persistence = wait_for_persistence.rpc_value(block_number);
184
185    if let Some(rlp) = rlp {
186        return Ok((
187            None,
188            serde_json::to_value((
189                RethNewPayloadInput::<ExecutionData>::BlockRlp(rlp),
190                wait_for_persistence,
191                no_wait_for_caches.then_some(false),
192            ))?,
193        ));
194    }
195    let block = block
196        .into_inner()
197        .map_header(|header| header.map(|h| h.into_header_with_defaults()))
198        .try_map_transactions(|tx| {
199            // try to convert unknowns into op type so that we can also support optimism
200            tx.try_into_either::<op_alloy_consensus::OpTxEnvelope>()
201        })?
202        .into_consensus();
203
204    // Convert to execution payload
205    let (payload, sidecar) = ExecutionPayload::from_block_slow(&block);
206    let (version, params, execution_data) =
207        payload_to_new_payload(payload, sidecar, is_optimism, block.withdrawals_root, None)?;
208
209    if reth_new_payload {
210        Ok((
211            None,
212            serde_json::to_value((
213                RethNewPayloadInput::ExecutionData(execution_data),
214                wait_for_persistence,
215                no_wait_for_caches.then_some(false),
216            ))?,
217        ))
218    } else {
219        Ok((Some(version), params))
220    }
221}
222
223/// Converts an execution payload and sidecar into versioned engine API params and an
224/// [`ExecutionData`].
225///
226/// Returns `(version, versioned_params, execution_data)`.
227pub(crate) fn payload_to_new_payload(
228    payload: ExecutionPayload,
229    sidecar: ExecutionPayloadSidecar,
230    is_optimism: bool,
231    withdrawals_root: Option<B256>,
232    target_version: Option<EngineApiMessageVersion>,
233) -> eyre::Result<(EngineApiMessageVersion, serde_json::Value, ExecutionData)> {
234    let execution_data = ExecutionData { payload: payload.clone(), sidecar: sidecar.clone() };
235
236    let (version, params) = match payload {
237        ExecutionPayload::V3(payload) => {
238            let cancun = sidecar.cancun().unwrap();
239
240            if let Some(prague) = sidecar.prague() {
241                // Use target version if provided (for Osaka), otherwise default to V4
242                let version = target_version.unwrap_or(EngineApiMessageVersion::V4);
243
244                if is_optimism {
245                    let withdrawals_root = withdrawals_root.ok_or_else(|| {
246                        eyre::eyre!("Missing withdrawals root for Optimism payload")
247                    })?;
248                    (
249                        version,
250                        serde_json::to_value((
251                            OpExecutionPayloadV4 { payload_inner: payload, withdrawals_root },
252                            cancun.versioned_hashes.clone(),
253                            cancun.parent_beacon_block_root,
254                            Requests::default(),
255                        ))?,
256                    )
257                } else {
258                    // Preserve the original RequestsOrHash payload for engine_newPayloadV4.
259                    let requests = prague.requests.clone();
260                    (
261                        version,
262                        serde_json::to_value((
263                            payload,
264                            cancun.versioned_hashes.clone(),
265                            cancun.parent_beacon_block_root,
266                            requests,
267                        ))?,
268                    )
269                }
270            } else {
271                (
272                    EngineApiMessageVersion::V3,
273                    serde_json::to_value((
274                        payload,
275                        cancun.versioned_hashes.clone(),
276                        cancun.parent_beacon_block_root,
277                    ))?,
278                )
279            }
280        }
281        ExecutionPayload::V2(payload) => {
282            let input = ExecutionPayloadInputV2 {
283                execution_payload: payload.payload_inner,
284                withdrawals: Some(payload.withdrawals),
285            };
286
287            (EngineApiMessageVersion::V2, serde_json::to_value((input,))?)
288        }
289        ExecutionPayload::V1(payload) => {
290            (EngineApiMessageVersion::V1, serde_json::to_value((payload,))?)
291        }
292    };
293
294    Ok((version, params, execution_data))
295}
296
297/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
298/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
299///
300/// # Panics
301/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
302#[allow(dead_code)]
303pub(crate) async fn call_new_payload<N: Network, P: Provider<N>>(
304    provider: P,
305    version: Option<EngineApiMessageVersion>,
306    params: serde_json::Value,
307) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
308    call_new_payload_with_reth(provider, version, params).await
309}
310
311/// Response from `reth_newPayload` endpoint, which includes server-measured latency.
312#[derive(Debug, Deserialize)]
313struct RethPayloadStatus {
314    latency_us: u64,
315    #[serde(default)]
316    persistence_wait_us: Option<u64>,
317    #[serde(default)]
318    execution_cache_wait_us: u64,
319    #[serde(default)]
320    sparse_trie_wait_us: u64,
321}
322
323/// Server-side timing breakdown from `reth_newPayload` endpoint.
324#[derive(Debug, Clone, Copy, Default)]
325pub(crate) struct NewPayloadTimingBreakdown {
326    /// Server-side execution latency.
327    pub(crate) latency: Duration,
328    /// Time spent waiting for persistence. `None` when no persistence was in-flight.
329    pub(crate) persistence_wait: Option<Duration>,
330    /// Time spent waiting for execution cache lock.
331    pub(crate) execution_cache_wait: Duration,
332    /// Time spent waiting for sparse trie lock.
333    pub(crate) sparse_trie_wait: Duration,
334}
335
336/// Calls either `engine_newPayload*` or `reth_newPayload` depending on whether
337/// `version` is provided.
338///
339/// When `version` is `None`, uses `reth_newPayload` endpoint with provided params.
340///
341/// Returns the server-reported timing breakdown when using the reth namespace, or `None` for
342/// the standard engine namespace.
343pub(crate) async fn call_new_payload_with_reth<N: Network, P: Provider<N>>(
344    provider: P,
345    version: Option<EngineApiMessageVersion>,
346    params: serde_json::Value,
347) -> eyre::Result<Option<NewPayloadTimingBreakdown>> {
348    let method = version.map(|v| v.method_name()).unwrap_or("reth_newPayload");
349
350    debug!(target: "reth-bench", method, "Sending newPayload");
351
352    let resp = loop {
353        let resp: serde_json::Value = provider.client().request(method, &params).await?;
354        let status = PayloadStatus::deserialize(&resp)?;
355
356        if status.is_valid() {
357            break resp;
358        }
359        if status.is_invalid() {
360            return Err(eyre::eyre!("Invalid {method}: {status:?}"));
361        }
362        if status.is_syncing() {
363            return Err(eyre::eyre!(
364                "invalid range: no canonical state found for parent of requested block"
365            ));
366        }
367    };
368
369    if version.is_some() {
370        return Ok(None);
371    }
372
373    let resp: RethPayloadStatus = serde_json::from_value(resp)?;
374
375    Ok(Some(NewPayloadTimingBreakdown {
376        latency: Duration::from_micros(resp.latency_us),
377        persistence_wait: resp.persistence_wait_us.map(Duration::from_micros),
378        execution_cache_wait: Duration::from_micros(resp.execution_cache_wait_us),
379        sparse_trie_wait: Duration::from_micros(resp.sparse_trie_wait_us),
380    }))
381}
382
383/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
384/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
385/// actual engine api message call.
386///
387/// Note: For Prague (V4), we still use forkchoiceUpdatedV3 as there is no V4.
388pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
389    provider: P,
390    message_version: EngineApiMessageVersion,
391    forkchoice_state: ForkchoiceState,
392    payload_attributes: Option<PayloadAttributes>,
393) -> TransportResult<ForkchoiceUpdated> {
394    // FCU V3 is used for both Cancun and Prague (there is no FCU V4)
395    match message_version {
396        EngineApiMessageVersion::V3 | EngineApiMessageVersion::V4 | EngineApiMessageVersion::V5 => {
397            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
398        }
399        EngineApiMessageVersion::V2 => {
400            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
401        }
402        EngineApiMessageVersion::V1 => {
403            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
404        }
405    }
406}
407
408/// Calls either `reth_forkchoiceUpdated` or the standard `engine_forkchoiceUpdated*` depending
409/// on `use_reth`.
410///
411/// When `use_reth` is true, uses the `reth_forkchoiceUpdated` endpoint which sends a regular FCU
412/// with no payload attributes.
413pub(crate) async fn call_forkchoice_updated_with_reth<
414    N: Network,
415    P: Provider<N> + EngineApiValidWaitExt<N>,
416>(
417    provider: P,
418    message_version: Option<EngineApiMessageVersion>,
419    forkchoice_state: ForkchoiceState,
420) -> TransportResult<ForkchoiceUpdated> {
421    if let Some(message_version) = message_version {
422        call_forkchoice_updated(provider, message_version, forkchoice_state, None).await
423    } else {
424        let method = "reth_forkchoiceUpdated";
425        let reth_params = serde_json::to_value((forkchoice_state,))
426            .expect("ForkchoiceState serialization cannot fail");
427
428        debug!(target: "reth-bench", method, "Sending forkchoiceUpdated");
429
430        loop {
431            let resp: ForkchoiceUpdated = provider.client().request(method, &reth_params).await?;
432
433            if resp.is_valid() {
434                break Ok(resp)
435            }
436
437            if resp.is_invalid() {
438                error!(target: "reth-bench", ?resp, "Invalid {method}");
439                return Err(alloy_json_rpc::RpcError::LocalUsageError(Box::new(
440                    std::io::Error::other(format!("Invalid {method}: {resp:?}")),
441                )))
442            }
443            if resp.is_syncing() {
444                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
445                    "invalid range: no canonical state found for parent of requested block",
446                ))
447            }
448        }
449    }
450}