reth_bench/
valid_payload.rs

1//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
2//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
3//! before sending additional calls.
4
5use alloy_primitives::B256;
6use alloy_provider::{ext::EngineApi, Network};
7use alloy_rpc_types_engine::{
8    ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
9    ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
10};
11use alloy_transport::TransportResult;
12use reth_node_api::EngineApiMessageVersion;
13use tracing::error;
14
15/// An extension trait for providers that implement the engine API, to wait for a VALID response.
16#[async_trait::async_trait]
17pub trait EngineApiValidWaitExt<N>: Send + Sync {
18    /// Calls `engine_newPayloadV1` with the given [ExecutionPayloadV1], and waits until the
19    /// response is VALID.
20    async fn new_payload_v1_wait(
21        &self,
22        payload: ExecutionPayloadV1,
23    ) -> TransportResult<PayloadStatus>;
24
25    /// Calls `engine_newPayloadV2` with the given [ExecutionPayloadInputV2], and waits until the
26    /// response is VALID.
27    async fn new_payload_v2_wait(
28        &self,
29        payload: ExecutionPayloadInputV2,
30    ) -> TransportResult<PayloadStatus>;
31
32    /// Calls `engine_newPayloadV3` with the given [ExecutionPayloadV3], parent beacon block root,
33    /// and versioned hashes, and waits until the response is VALID.
34    async fn new_payload_v3_wait(
35        &self,
36        payload: ExecutionPayloadV3,
37        versioned_hashes: Vec<B256>,
38        parent_beacon_block_root: B256,
39    ) -> TransportResult<PayloadStatus>;
40
41    /// Calls `engine_forkChoiceUpdatedV1` with the given [ForkchoiceState] and optional
42    /// [PayloadAttributes], and waits until the response is VALID.
43    async fn fork_choice_updated_v1_wait(
44        &self,
45        fork_choice_state: ForkchoiceState,
46        payload_attributes: Option<PayloadAttributes>,
47    ) -> TransportResult<ForkchoiceUpdated>;
48
49    /// Calls `engine_forkChoiceUpdatedV2` with the given [ForkchoiceState] and optional
50    /// [PayloadAttributes], and waits until the response is VALID.
51    async fn fork_choice_updated_v2_wait(
52        &self,
53        fork_choice_state: ForkchoiceState,
54        payload_attributes: Option<PayloadAttributes>,
55    ) -> TransportResult<ForkchoiceUpdated>;
56
57    /// Calls `engine_forkChoiceUpdatedV3` with the given [ForkchoiceState] and optional
58    /// [PayloadAttributes], and waits until the response is VALID.
59    async fn fork_choice_updated_v3_wait(
60        &self,
61        fork_choice_state: ForkchoiceState,
62        payload_attributes: Option<PayloadAttributes>,
63    ) -> TransportResult<ForkchoiceUpdated>;
64}
65
66#[async_trait::async_trait]
67impl<N, P> EngineApiValidWaitExt<N> for P
68where
69    N: Network,
70    P: EngineApi<N>,
71{
72    async fn new_payload_v1_wait(
73        &self,
74        payload: ExecutionPayloadV1,
75    ) -> TransportResult<PayloadStatus> {
76        let mut status = self.new_payload_v1(payload.clone()).await?;
77        while !status.is_valid() {
78            if status.is_invalid() {
79                error!(?status, ?payload, "Invalid newPayloadV1",);
80                panic!("Invalid newPayloadV1: {status:?}");
81            }
82            status = self.new_payload_v1(payload.clone()).await?;
83        }
84        Ok(status)
85    }
86
87    async fn new_payload_v2_wait(
88        &self,
89        payload: ExecutionPayloadInputV2,
90    ) -> TransportResult<PayloadStatus> {
91        let mut status = self.new_payload_v2(payload.clone()).await?;
92        while !status.is_valid() {
93            if status.is_invalid() {
94                error!(?status, ?payload, "Invalid newPayloadV2",);
95                panic!("Invalid newPayloadV2: {status:?}");
96            }
97            status = self.new_payload_v2(payload.clone()).await?;
98        }
99        Ok(status)
100    }
101
102    async fn new_payload_v3_wait(
103        &self,
104        payload: ExecutionPayloadV3,
105        versioned_hashes: Vec<B256>,
106        parent_beacon_block_root: B256,
107    ) -> TransportResult<PayloadStatus> {
108        let mut status = self
109            .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
110            .await?;
111        while !status.is_valid() {
112            if status.is_invalid() {
113                error!(
114                    ?status,
115                    ?payload,
116                    ?versioned_hashes,
117                    ?parent_beacon_block_root,
118                    "Invalid newPayloadV3",
119                );
120                panic!("Invalid newPayloadV3: {status:?}");
121            }
122            if status.is_syncing() {
123                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
124                    "invalid range: no canonical state found for parent of requested block",
125                ))
126            }
127            status = self
128                .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
129                .await?;
130        }
131        Ok(status)
132    }
133
134    async fn fork_choice_updated_v1_wait(
135        &self,
136        fork_choice_state: ForkchoiceState,
137        payload_attributes: Option<PayloadAttributes>,
138    ) -> TransportResult<ForkchoiceUpdated> {
139        let mut status =
140            self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
141
142        while !status.is_valid() {
143            if status.is_invalid() {
144                error!(
145                    ?status,
146                    ?fork_choice_state,
147                    ?payload_attributes,
148                    "Invalid forkchoiceUpdatedV1 message",
149                );
150                panic!("Invalid forkchoiceUpdatedV1: {status:?}");
151            }
152            if status.is_syncing() {
153                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
154                    "invalid range: no canonical state found for parent of requested block",
155                ))
156            }
157            status =
158                self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
159        }
160
161        Ok(status)
162    }
163
164    async fn fork_choice_updated_v2_wait(
165        &self,
166        fork_choice_state: ForkchoiceState,
167        payload_attributes: Option<PayloadAttributes>,
168    ) -> TransportResult<ForkchoiceUpdated> {
169        let mut status =
170            self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
171
172        while !status.is_valid() {
173            if status.is_invalid() {
174                error!(
175                    ?status,
176                    ?fork_choice_state,
177                    ?payload_attributes,
178                    "Invalid forkchoiceUpdatedV2 message",
179                );
180                panic!("Invalid forkchoiceUpdatedV2: {status:?}");
181            }
182            if status.is_syncing() {
183                return Err(alloy_json_rpc::RpcError::UnsupportedFeature(
184                    "invalid range: no canonical state found for parent of requested block",
185                ))
186            }
187            status =
188                self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
189        }
190
191        Ok(status)
192    }
193
194    async fn fork_choice_updated_v3_wait(
195        &self,
196        fork_choice_state: ForkchoiceState,
197        payload_attributes: Option<PayloadAttributes>,
198    ) -> TransportResult<ForkchoiceUpdated> {
199        let mut status =
200            self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
201
202        while !status.is_valid() {
203            if status.is_invalid() {
204                error!(
205                    ?status,
206                    ?fork_choice_state,
207                    ?payload_attributes,
208                    "Invalid forkchoiceUpdatedV3 message",
209                );
210                panic!("Invalid forkchoiceUpdatedV3: {status:?}");
211            }
212            status =
213                self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
214        }
215
216        Ok(status)
217    }
218}
219
220/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
221/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
222///
223/// # Panics
224/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
225pub(crate) async fn call_new_payload<N, P: EngineApiValidWaitExt<N>>(
226    provider: P,
227    payload: ExecutionPayload,
228    parent_beacon_block_root: Option<B256>,
229    versioned_hashes: Vec<B256>,
230) -> TransportResult<EngineApiMessageVersion> {
231    match payload {
232        ExecutionPayload::V3(payload) => {
233            // We expect the caller
234            let parent_beacon_block_root = parent_beacon_block_root
235                .expect("parent_beacon_block_root is required for V3 payloads");
236            provider
237                .new_payload_v3_wait(payload, versioned_hashes, parent_beacon_block_root)
238                .await?;
239
240            Ok(EngineApiMessageVersion::V3)
241        }
242        ExecutionPayload::V2(payload) => {
243            let input = ExecutionPayloadInputV2 {
244                execution_payload: payload.payload_inner,
245                withdrawals: Some(payload.withdrawals),
246            };
247
248            provider.new_payload_v2_wait(input).await?;
249
250            Ok(EngineApiMessageVersion::V2)
251        }
252        ExecutionPayload::V1(payload) => {
253            provider.new_payload_v1_wait(payload).await?;
254
255            Ok(EngineApiMessageVersion::V1)
256        }
257    }
258}
259
260/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
261/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
262/// actual engine api message call.
263pub(crate) async fn call_forkchoice_updated<N, P: EngineApiValidWaitExt<N>>(
264    provider: P,
265    message_version: EngineApiMessageVersion,
266    forkchoice_state: ForkchoiceState,
267    payload_attributes: Option<PayloadAttributes>,
268) -> TransportResult<ForkchoiceUpdated> {
269    match message_version {
270        EngineApiMessageVersion::V4 => todo!("V4 payloads not supported yet"),
271        EngineApiMessageVersion::V3 => {
272            provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
273        }
274        EngineApiMessageVersion::V2 => {
275            provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
276        }
277        EngineApiMessageVersion::V1 => {
278            provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
279        }
280    }
281}