Skip to main content

reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B128, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14    ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15    ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16    PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::{CellCustody, NetworkInfo};
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25    validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26    PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BalProvider, BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34    sync::Arc,
35    time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40/// The Engine API response sender.
41pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43/// The upper limit for payload bodies request.
44const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46/// The upper limit for blobs in `engine_getBlobsVx`.
47const MAX_BLOB_LIMIT: usize = 128;
48
49/// The Engine API implementation that grants the Consensus layer access to data and
50/// functions in the Execution layer that are crucial for the consensus process.
51///
52/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
53/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
54/// are still follow the engine API model.
55///
56/// ## Implementers
57///
58/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
59/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
60/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
61/// endpoints (e.g. opstack).
62/// See also [`EngineApiServer`] implementation for this type which is the
63/// L1 implementation.
64pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71    /// Returns the configured chainspec.
72    pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73        &self.inner.chain_spec
74    }
75
76    /// Returns the configured client version.
77    pub fn client_version(&self) -> &ClientVersionV1 {
78        &self.inner.client
79    }
80}
81
82impl<Provider, PayloadT, Pool, Validator, ChainSpec>
83    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
84where
85    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
86    PayloadT: PayloadTypes,
87    Pool: TransactionPool + 'static,
88    Validator: EngineApiValidator<PayloadT>,
89    ChainSpec: EthereumHardforks + Send + Sync + 'static,
90{
91    /// Create new instance of [`EngineApi`].
92    #[expect(clippy::too_many_arguments)]
93    pub fn new(
94        provider: Provider,
95        chain_spec: Arc<ChainSpec>,
96        beacon_consensus: ConsensusEngineHandle<PayloadT>,
97        payload_store: PayloadStore<PayloadT>,
98        tx_pool: Pool,
99        task_spawner: Runtime,
100        client: ClientVersionV1,
101        capabilities: EngineCapabilities,
102        validator: Validator,
103        accept_execution_requests_hash: bool,
104        network: impl NetworkInfo + 'static,
105    ) -> Self {
106        let cell_custody = network.cell_custody().clone();
107        let is_syncing = Arc::new(move || network.is_syncing());
108        let inner = Arc::new(EngineApiInner {
109            provider,
110            chain_spec,
111            beacon_consensus,
112            payload_store,
113            task_spawner,
114            metrics: EngineApiMetrics::default(),
115            client,
116            capabilities,
117            tx_pool,
118            validator,
119            accept_execution_requests_hash,
120            cell_custody,
121            is_syncing,
122        });
123        Self { inner }
124    }
125
126    /// Fetches the client version.
127    pub fn get_client_version_v1(
128        &self,
129        _client: ClientVersionV1,
130    ) -> EngineApiResult<Vec<ClientVersionV1>> {
131        Ok(vec![self.inner.client.clone()])
132    }
133
134    /// Fetches the timestamp of the payload with the given id.
135    async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
136        Ok(self
137            .inner
138            .payload_store
139            .payload_timestamp(payload_id)
140            .await
141            .ok_or(EngineApiError::UnknownPayload)??)
142    }
143
144    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
145    /// Caution: This should not accept the `withdrawals` field
146    pub async fn new_payload_v1(
147        &self,
148        payload: PayloadT::ExecutionData,
149    ) -> EngineApiResult<PayloadStatus> {
150        let payload_or_attrs = PayloadOrAttributes::<
151            '_,
152            PayloadT::ExecutionData,
153            PayloadT::PayloadAttributes,
154        >::from_execution_payload(&payload);
155
156        self.inner
157            .validator
158            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
159
160        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
161    }
162
163    /// Metered version of `new_payload_v1`.
164    pub async fn new_payload_v1_metered(
165        &self,
166        payload: PayloadT::ExecutionData,
167    ) -> EngineApiResult<PayloadStatus> {
168        let start = Instant::now();
169        let res = Self::new_payload_v1(self, payload).await;
170        let elapsed = start.elapsed();
171        self.inner.metrics.latency.new_payload_v1.record(elapsed);
172        res
173    }
174
175    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
176    pub async fn new_payload_v2(
177        &self,
178        payload: PayloadT::ExecutionData,
179    ) -> EngineApiResult<PayloadStatus> {
180        let payload_or_attrs = PayloadOrAttributes::<
181            '_,
182            PayloadT::ExecutionData,
183            PayloadT::PayloadAttributes,
184        >::from_execution_payload(&payload);
185        self.inner
186            .validator
187            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
188        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
189    }
190
191    /// Metered version of `new_payload_v2`.
192    pub async fn new_payload_v2_metered(
193        &self,
194        payload: PayloadT::ExecutionData,
195    ) -> EngineApiResult<PayloadStatus> {
196        let start = Instant::now();
197        let res = Self::new_payload_v2(self, payload).await;
198        let elapsed = start.elapsed();
199        self.inner.metrics.latency.new_payload_v2.record(elapsed);
200        res
201    }
202
203    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
204    pub async fn new_payload_v3(
205        &self,
206        payload: PayloadT::ExecutionData,
207    ) -> EngineApiResult<PayloadStatus> {
208        let payload_or_attrs = PayloadOrAttributes::<
209            '_,
210            PayloadT::ExecutionData,
211            PayloadT::PayloadAttributes,
212        >::from_execution_payload(&payload);
213        self.inner
214            .validator
215            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
216
217        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
218    }
219
220    /// Metrics version of `new_payload_v3`
221    pub async fn new_payload_v3_metered(
222        &self,
223        payload: PayloadT::ExecutionData,
224    ) -> RpcResult<PayloadStatus> {
225        let start = Instant::now();
226
227        let res = Self::new_payload_v3(self, payload).await;
228        let elapsed = start.elapsed();
229        self.inner.metrics.latency.new_payload_v3.record(elapsed);
230        Ok(res?)
231    }
232
233    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
234    pub async fn new_payload_v4(
235        &self,
236        payload: PayloadT::ExecutionData,
237    ) -> EngineApiResult<PayloadStatus> {
238        let payload_or_attrs = PayloadOrAttributes::<
239            '_,
240            PayloadT::ExecutionData,
241            PayloadT::PayloadAttributes,
242        >::from_execution_payload(&payload);
243        self.inner
244            .validator
245            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
246
247        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
248    }
249
250    /// Metrics version of `new_payload_v4`
251    pub async fn new_payload_v4_metered(
252        &self,
253        payload: PayloadT::ExecutionData,
254    ) -> RpcResult<PayloadStatus> {
255        let start = Instant::now();
256        let res = Self::new_payload_v4(self, payload).await;
257
258        let elapsed = start.elapsed();
259        self.inner.metrics.latency.new_payload_v4.record(elapsed);
260        Ok(res?)
261    }
262
263    /// Handler for `engine_newPayloadV5`
264    ///
265    /// Post-Amsterdam payload handler.
266    ///
267    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_newpayloadv5>
268    pub async fn new_payload_v5(
269        &self,
270        payload: PayloadT::ExecutionData,
271    ) -> EngineApiResult<PayloadStatus> {
272        let payload_or_attrs = PayloadOrAttributes::<
273            '_,
274            PayloadT::ExecutionData,
275            PayloadT::PayloadAttributes,
276        >::from_execution_payload(&payload);
277        self.inner
278            .validator
279            .validate_version_specific_fields(EngineApiMessageVersion::V5, payload_or_attrs)?;
280        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
281    }
282
283    /// Metrics version of `new_payload_v5`
284    pub async fn new_payload_v5_metered(
285        &self,
286        payload: PayloadT::ExecutionData,
287    ) -> RpcResult<PayloadStatus> {
288        let start = Instant::now();
289        let res = Self::new_payload_v5(self, payload).await;
290        let elapsed = start.elapsed();
291        self.inner.metrics.latency.new_payload_v5.record(elapsed);
292        Ok(res?)
293    }
294
295    /// Returns whether the engine accepts execution requests hash.
296    pub fn accept_execution_requests_hash(&self) -> bool {
297        self.inner.accept_execution_requests_hash
298    }
299}
300
301impl<Provider, EngineT, Pool, Validator, ChainSpec>
302    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
303where
304    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
305    EngineT: EngineTypes,
306    Pool: TransactionPool + 'static,
307    Validator: EngineApiValidator<EngineT>,
308    ChainSpec: EthereumHardforks + Send + Sync + 'static,
309{
310    /// Sends a message to the beacon consensus engine to update the fork choice _without_
311    /// withdrawals.
312    ///
313    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
314    ///
315    /// Caution: This should not accept the `withdrawals` field
316    pub async fn fork_choice_updated_v1(
317        &self,
318        state: ForkchoiceState,
319        payload_attrs: Option<EngineT::PayloadAttributes>,
320    ) -> EngineApiResult<ForkchoiceUpdated> {
321        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
322            .await
323    }
324
325    /// Metrics version of `fork_choice_updated_v1`
326    pub async fn fork_choice_updated_v1_metered(
327        &self,
328        state: ForkchoiceState,
329        payload_attrs: Option<EngineT::PayloadAttributes>,
330    ) -> EngineApiResult<ForkchoiceUpdated> {
331        let start = Instant::now();
332        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
333        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
334        res
335    }
336
337    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
338    /// but only _after_ shanghai.
339    ///
340    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
341    pub async fn fork_choice_updated_v2(
342        &self,
343        state: ForkchoiceState,
344        payload_attrs: Option<EngineT::PayloadAttributes>,
345    ) -> EngineApiResult<ForkchoiceUpdated> {
346        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
347            .await
348    }
349
350    /// Metrics version of `fork_choice_updated_v2`
351    pub async fn fork_choice_updated_v2_metered(
352        &self,
353        state: ForkchoiceState,
354        payload_attrs: Option<EngineT::PayloadAttributes>,
355    ) -> EngineApiResult<ForkchoiceUpdated> {
356        let start = Instant::now();
357        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
358        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
359        res
360    }
361
362    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
363    /// but only _after_ cancun.
364    ///
365    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
366    pub async fn fork_choice_updated_v3(
367        &self,
368        state: ForkchoiceState,
369        payload_attrs: Option<EngineT::PayloadAttributes>,
370    ) -> EngineApiResult<ForkchoiceUpdated> {
371        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
372            .await
373    }
374
375    /// Metrics version of `fork_choice_updated_v3`
376    pub async fn fork_choice_updated_v3_metered(
377        &self,
378        state: ForkchoiceState,
379        payload_attrs: Option<EngineT::PayloadAttributes>,
380    ) -> EngineApiResult<ForkchoiceUpdated> {
381        let start = Instant::now();
382        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
383        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
384        res
385    }
386
387    /// Sends a message to the beacon consensus engine to update the fork choice _with_ slot number,
388    /// but only _after_ amsterdam.
389    ///
390    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_forkchoiceupdatedv4>
391    pub async fn fork_choice_updated_v4(
392        &self,
393        state: ForkchoiceState,
394        payload_attrs: Option<EngineT::PayloadAttributes>,
395        custody_columns: Option<B128>,
396    ) -> EngineApiResult<ForkchoiceUpdated> {
397        if let Some(custody_columns) = custody_columns {
398            self.inner.cell_custody.set(custody_columns);
399        }
400        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V4, state, payload_attrs)
401            .await
402    }
403
404    /// Metrics version of `fork_choice_updated_v4`
405    pub async fn fork_choice_updated_v4_metered(
406        &self,
407        state: ForkchoiceState,
408        payload_attrs: Option<EngineT::PayloadAttributes>,
409        custody_columns: Option<B128>,
410    ) -> EngineApiResult<ForkchoiceUpdated> {
411        let start = Instant::now();
412        let res = Self::fork_choice_updated_v4(self, state, payload_attrs, custody_columns).await;
413        self.inner.metrics.latency.fork_choice_updated_v4.record(start.elapsed());
414        res
415    }
416
417    /// Helper function for retrieving the build payload by id.
418    async fn get_built_payload(
419        &self,
420        payload_id: PayloadId,
421    ) -> EngineApiResult<EngineT::BuiltPayload> {
422        self.inner
423            .payload_store
424            .resolve(payload_id)
425            .await
426            .ok_or(EngineApiError::UnknownPayload)?
427            .map_err(|_| EngineApiError::UnknownPayload)
428    }
429
430    /// Helper function for validating the payload timestamp and retrieving & converting the payload
431    /// into desired envelope.
432    async fn get_payload_inner<R>(
433        &self,
434        payload_id: PayloadId,
435        version: EngineApiMessageVersion,
436    ) -> EngineApiResult<R>
437    where
438        EngineT::BuiltPayload: TryInto<R>,
439    {
440        // Validate timestamp according to engine rules
441        // Enforces Osaka restrictions on `getPayloadV4`.
442        let timestamp = self.get_payload_timestamp(payload_id).await?;
443        validate_payload_timestamp(
444            &self.inner.chain_spec,
445            version,
446            timestamp,
447            MessageValidationKind::GetPayload,
448        )?;
449
450        // Now resolve the payload
451        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
452            warn!(?version, "could not transform built payload");
453            EngineApiError::UnknownPayload
454        })
455    }
456
457    /// Returns the most recent version of the payload that is available in the corresponding
458    /// payload build process at the time of receiving this call.
459    ///
460    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
461    ///
462    /// Caution: This should not return the `withdrawals` field
463    ///
464    /// Note:
465    /// > Provider software MAY stop the corresponding build process after serving this call.
466    pub async fn get_payload_v1(
467        &self,
468        payload_id: PayloadId,
469    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
470        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
471            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
472            EngineApiError::UnknownPayload
473        })
474    }
475
476    /// Metrics version of `get_payload_v1`
477    pub async fn get_payload_v1_metered(
478        &self,
479        payload_id: PayloadId,
480    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
481        let start = Instant::now();
482        let res = Self::get_payload_v1(self, payload_id).await;
483        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
484        res
485    }
486
487    /// Returns the most recent version of the payload that is available in the corresponding
488    /// payload build process at the time of receiving this call.
489    ///
490    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
491    ///
492    /// Note:
493    /// > Provider software MAY stop the corresponding build process after serving this call.
494    pub async fn get_payload_v2(
495        &self,
496        payload_id: PayloadId,
497    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
498        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
499    }
500
501    /// Metrics version of `get_payload_v2`
502    pub async fn get_payload_v2_metered(
503        &self,
504        payload_id: PayloadId,
505    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
506        let start = Instant::now();
507        let res = Self::get_payload_v2(self, payload_id).await;
508        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
509        res
510    }
511
512    /// Returns the most recent version of the payload that is available in the corresponding
513    /// payload build process at the time of receiving this call.
514    ///
515    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
516    ///
517    /// Note:
518    /// > Provider software MAY stop the corresponding build process after serving this call.
519    pub async fn get_payload_v3(
520        &self,
521        payload_id: PayloadId,
522    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
523        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
524    }
525
526    /// Metrics version of `get_payload_v3`
527    pub async fn get_payload_v3_metered(
528        &self,
529        payload_id: PayloadId,
530    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
531        let start = Instant::now();
532        let res = Self::get_payload_v3(self, payload_id).await;
533        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
534        res
535    }
536
537    /// Returns the most recent version of the payload that is available in the corresponding
538    /// payload build process at the time of receiving this call.
539    ///
540    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_getpayloadv4>
541    ///
542    /// Note:
543    /// > Provider software MAY stop the corresponding build process after serving this call.
544    pub async fn get_payload_v4(
545        &self,
546        payload_id: PayloadId,
547    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
548        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
549    }
550
551    /// Metrics version of `get_payload_v4`
552    pub async fn get_payload_v4_metered(
553        &self,
554        payload_id: PayloadId,
555    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
556        let start = Instant::now();
557        let res = Self::get_payload_v4(self, payload_id).await;
558        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
559        res
560    }
561
562    /// Handler for `engine_getPayloadV5`
563    ///
564    /// Returns the most recent version of the payload that is available in the corresponding
565    /// payload build process at the time of receiving this call.
566    ///
567    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
568    ///
569    /// Note:
570    /// > Provider software MAY stop the corresponding build process after serving this call.
571    pub async fn get_payload_v5(
572        &self,
573        payload_id: PayloadId,
574    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
575        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
576    }
577
578    /// Metrics version of `get_payload_v5`
579    pub async fn get_payload_v5_metered(
580        &self,
581        payload_id: PayloadId,
582    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
583        let start = Instant::now();
584        let res = Self::get_payload_v5(self, payload_id).await;
585        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
586        res
587    }
588
589    /// Handler for `engine_getPayloadV6`
590    ///
591    /// Post-Amsterdam payload handler that includes Block Access Lists (BAL).
592    ///
593    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_getpayloadv6>
594    pub async fn get_payload_v6(
595        &self,
596        payload_id: PayloadId,
597    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
598        self.get_payload_inner(payload_id, EngineApiMessageVersion::V6).await
599    }
600
601    /// Metrics version of `get_payload_v6`
602    pub async fn get_payload_v6_metered(
603        &self,
604        payload_id: PayloadId,
605    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
606        let start = Instant::now();
607        let res = Self::get_payload_v6(self, payload_id).await;
608        self.inner.metrics.latency.get_payload_v6.record(start.elapsed());
609        res
610    }
611
612    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
613    /// blocks and returns the mapped payload bodies.
614    pub async fn get_payload_bodies_by_range_with<F, R>(
615        &self,
616        start: BlockNumber,
617        count: u64,
618        f: F,
619    ) -> EngineApiResult<Vec<Option<R>>>
620    where
621        F: Fn(Provider::Block) -> R + Send + 'static,
622        R: Send + 'static,
623    {
624        let (tx, rx) = oneshot::channel();
625        let inner = self.inner.clone();
626
627        self.inner.task_spawner.spawn_blocking_task(async move {
628            if count > MAX_PAYLOAD_BODIES_LIMIT {
629                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
630                return;
631            }
632
633            if start == 0 || count == 0 {
634                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
635                return;
636            }
637
638            let mut result = Vec::with_capacity(count as usize);
639
640            // -1 so range is inclusive
641            let mut end = start.saturating_add(count - 1);
642
643            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
644            // truncate the end if it's greater than the last block
645            if let Ok(best_block) = inner.provider.best_block_number()
646                && end > best_block {
647                    end = best_block;
648                }
649
650            // Check if the requested range starts before the earliest available block due to pruning/expiry
651            let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
652            for num in start..=end {
653                if num < earliest_block {
654                    result.push(None);
655                    continue;
656                }
657                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
658                match block_result {
659                    Ok(block) => {
660                        result.push(block.map(&f));
661                    }
662                    Err(err) => {
663                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
664                        return;
665                    }
666                };
667            }
668            tx.send(Ok(result)).ok();
669        });
670
671        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
672    }
673
674    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
675    /// blocks.
676    ///
677    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
678    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
679    /// adversarial.
680    ///
681    /// Implementers should take care when acting on the input to this method, specifically
682    /// ensuring that the range is limited properly, and that the range boundaries are computed
683    /// correctly and without panics.
684    pub async fn get_payload_bodies_by_range_v1(
685        &self,
686        start: BlockNumber,
687        count: u64,
688    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
689        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
690            transactions: block.body().encoded_2718_transactions(),
691            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
692        })
693        .await
694    }
695
696    /// Metrics version of `get_payload_bodies_by_range_v1`
697    pub async fn get_payload_bodies_by_range_v1_metered(
698        &self,
699        start: BlockNumber,
700        count: u64,
701    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
702        let start_time = Instant::now();
703        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
704        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
705        res
706    }
707
708    /// Returns the execution payload bodies by the range (V2).
709    ///
710    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
711    pub async fn get_payload_bodies_by_range_v2(
712        &self,
713        start: BlockNumber,
714        count: u64,
715    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
716        let mut payload_bodies = self
717            .get_payload_bodies_by_range_with(start, count, |block| {
718                let block_hash = block.header().hash_slow();
719                (
720                    block_hash,
721                    ExecutionPayloadBodyV2 {
722                        transactions: block.body().encoded_2718_transactions(),
723                        withdrawals: block
724                            .body()
725                            .withdrawals()
726                            .cloned()
727                            .map(Withdrawals::into_inner),
728                        block_access_list: None,
729                    },
730                )
731            })
732            .await?;
733
734        let block_hashes = payload_bodies
735            .iter()
736            .filter_map(|payload_body| payload_body.as_ref().map(|(block_hash, _)| *block_hash))
737            .collect::<Vec<_>>();
738        let block_access_lists = self.get_block_access_lists_by_hashes(block_hashes).await?;
739
740        for (payload_body, block_access_list) in
741            payload_bodies.iter_mut().filter_map(Option::as_mut).zip(block_access_lists)
742        {
743            payload_body.1.block_access_list = block_access_list;
744        }
745
746        Ok(payload_bodies
747            .into_iter()
748            .map(|payload_body| payload_body.map(|(_, payload_body)| payload_body))
749            .collect())
750    }
751
752    /// Metrics version of `get_payload_bodies_by_range_v2`
753    pub async fn get_payload_bodies_by_range_v2_metered(
754        &self,
755        start: BlockNumber,
756        count: u64,
757    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
758        let start_time = Instant::now();
759        let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
760        self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
761        res
762    }
763
764    /// Called to retrieve execution payload bodies by hashes.
765    pub async fn get_payload_bodies_by_hash_with<F, R>(
766        &self,
767        hashes: Vec<BlockHash>,
768        f: F,
769    ) -> EngineApiResult<Vec<Option<R>>>
770    where
771        F: Fn(Provider::Block) -> R + Send + 'static,
772        R: Send + 'static,
773    {
774        let len = hashes.len() as u64;
775        if len > MAX_PAYLOAD_BODIES_LIMIT {
776            return Err(EngineApiError::PayloadRequestTooLarge { len });
777        }
778
779        let (tx, rx) = oneshot::channel();
780        let inner = self.inner.clone();
781
782        self.inner.task_spawner.spawn_blocking_task(async move {
783            let mut result = Vec::with_capacity(hashes.len());
784            for hash in hashes {
785                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
786                match block_result {
787                    Ok(block) => {
788                        result.push(block.map(&f));
789                    }
790                    Err(err) => {
791                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
792                        return;
793                    }
794                }
795            }
796            tx.send(Ok(result)).ok();
797        });
798
799        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
800    }
801
802    async fn get_block_access_lists_by_hashes(
803        &self,
804        hashes: Vec<BlockHash>,
805    ) -> EngineApiResult<Vec<Option<Bytes>>> {
806        let len = hashes.len() as u64;
807        if len > MAX_PAYLOAD_BODIES_LIMIT {
808            return Err(EngineApiError::PayloadRequestTooLarge { len });
809        }
810
811        let (tx, rx) = oneshot::channel();
812        let bal_store = self.inner.provider.bal_store().clone();
813
814        self.inner.task_spawner.spawn_blocking_task(async move {
815            tx.send(
816                bal_store
817                    .get_by_hashes(&hashes)
818                    .map_err(|err| EngineApiError::Internal(Box::new(err))),
819            )
820            .ok();
821        });
822
823        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
824    }
825
826    /// Called to retrieve execution payload bodies by hashes.
827    pub async fn get_payload_bodies_by_hash_v1(
828        &self,
829        hashes: Vec<BlockHash>,
830    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
831        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
832            transactions: block.body().encoded_2718_transactions(),
833            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
834        })
835        .await
836    }
837
838    /// Metrics version of `get_payload_bodies_by_hash_v1`
839    pub async fn get_payload_bodies_by_hash_v1_metered(
840        &self,
841        hashes: Vec<BlockHash>,
842    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
843        let start = Instant::now();
844        let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
845        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
846        res
847    }
848
849    /// Called to retrieve execution payload bodies by hashes (V2).
850    ///
851    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
852    pub async fn get_payload_bodies_by_hash_v2(
853        &self,
854        hashes: Vec<BlockHash>,
855    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
856        let payload_bodies =
857            self.get_payload_bodies_by_hash_with(hashes.clone(), |block| ExecutionPayloadBodyV2 {
858                transactions: block.body().encoded_2718_transactions(),
859                withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
860                block_access_list: None,
861            });
862        let block_access_lists = self.get_block_access_lists_by_hashes(hashes);
863        let (mut payload_bodies, block_access_lists) =
864            tokio::try_join!(payload_bodies, block_access_lists)?;
865
866        for (payload_body, block_access_list) in payload_bodies.iter_mut().zip(block_access_lists) {
867            if let Some(payload_body) = payload_body {
868                payload_body.block_access_list = block_access_list;
869            }
870        }
871
872        Ok(payload_bodies)
873    }
874
875    /// Metrics version of `get_payload_bodies_by_hash_v2`
876    pub async fn get_payload_bodies_by_hash_v2_metered(
877        &self,
878        hashes: Vec<BlockHash>,
879    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
880        let start = Instant::now();
881        let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
882        self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
883        res
884    }
885
886    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
887    /// update.
888    ///
889    /// The payload attributes will be validated according to the engine API rules for the given
890    /// message version:
891    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
892    ///   validated according to the Paris rules.
893    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
894    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
895    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
896    ///
897    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
898    ///   validated according to the Cancun rules.
899    async fn validate_and_execute_forkchoice(
900        &self,
901        version: EngineApiMessageVersion,
902        state: ForkchoiceState,
903        payload_attrs: Option<EngineT::PayloadAttributes>,
904    ) -> EngineApiResult<ForkchoiceUpdated> {
905        if let Some(ref attrs) = payload_attrs {
906            let attr_validation_res =
907                self.inner.validator.ensure_well_formed_attributes(version, attrs);
908
909            // From the engine API spec:
910            //
911            // Client software MUST ensure that payloadAttributes.timestamp is greater than
912            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
913            // isn't held client software MUST respond with -38003: Invalid payload attributes and
914            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
915            // update MUST NOT be rolled back.
916            //
917            // NOTE: This also applies to cancun/shanghai-specific payload attributes.
918            if let Err(err) = attr_validation_res {
919                let fcu_res = self.inner.beacon_consensus.fork_choice_updated(state, None).await?;
920                if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
921                    return Ok(fcu_res)
922                }
923                return Err(err.into())
924            }
925        }
926
927        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
928    }
929
930    /// Returns reference to supported capabilities.
931    pub fn capabilities(&self) -> &EngineCapabilities {
932        &self.inner.capabilities
933    }
934
935    fn get_blobs_v1(
936        &self,
937        versioned_hashes: Vec<B256>,
938    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
939        // Only allow this method before Osaka fork
940        let current_timestamp =
941            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
942        if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
943            return Err(EngineApiError::EngineObjectValidationError(
944                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
945            ));
946        }
947
948        if versioned_hashes.len() > MAX_BLOB_LIMIT {
949            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
950        }
951
952        self.inner
953            .tx_pool
954            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
955            .map_err(|err| EngineApiError::Internal(Box::new(err)))
956    }
957
958    /// Metered version of `get_blobs_v1`.
959    pub fn get_blobs_v1_metered(
960        &self,
961        versioned_hashes: Vec<B256>,
962    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
963        let hashes_len = versioned_hashes.len();
964        let start = Instant::now();
965        let res = Self::get_blobs_v1(self, versioned_hashes);
966        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
967
968        if let Ok(blobs) = &res {
969            let blobs_found = blobs.iter().flatten().count();
970            let blobs_missed = hashes_len - blobs_found;
971
972            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
973            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
974        }
975
976        res
977    }
978
979    fn get_blobs_v2(
980        &self,
981        versioned_hashes: Vec<B256>,
982    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
983        // Check if Osaka fork is active
984        let current_timestamp =
985            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
986        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
987            return Err(EngineApiError::EngineObjectValidationError(
988                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
989            ));
990        }
991
992        if versioned_hashes.len() > MAX_BLOB_LIMIT {
993            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
994        }
995
996        self.inner
997            .tx_pool
998            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
999            .map_err(|err| EngineApiError::Internal(Box::new(err)))
1000    }
1001
1002    fn get_blobs_v3(
1003        &self,
1004        versioned_hashes: Vec<B256>,
1005    ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1006        // Check if Osaka fork is active
1007        let current_timestamp =
1008            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
1009        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
1010            return Err(EngineApiError::EngineObjectValidationError(
1011                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1012            ));
1013        }
1014
1015        if versioned_hashes.len() > MAX_BLOB_LIMIT {
1016            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1017        }
1018
1019        // Spec requires returning `null` if syncing.
1020        if (*self.inner.is_syncing)() {
1021            return Ok(None)
1022        }
1023
1024        self.inner
1025            .tx_pool
1026            .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
1027            .map(Some)
1028            .map_err(|err| EngineApiError::Internal(Box::new(err)))
1029    }
1030
1031    fn get_blobs_v4(
1032        &self,
1033        versioned_hashes: Vec<B256>,
1034        indices_bitarray: B128,
1035    ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1036        let current_timestamp =
1037            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
1038        if !self.inner.chain_spec.is_amsterdam_active_at_timestamp(current_timestamp) {
1039            return Err(EngineApiError::EngineObjectValidationError(
1040                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1041            ));
1042        }
1043
1044        if versioned_hashes.len() > MAX_BLOB_LIMIT {
1045            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1046        }
1047
1048        // Spec requires returning `null` if syncing.
1049        if (*self.inner.is_syncing)() {
1050            return Ok(None)
1051        }
1052
1053        self.inner
1054            .tx_pool
1055            .get_blobs_for_versioned_hashes_v4(&versioned_hashes, indices_bitarray)
1056            .map(Some)
1057            .map_err(|err| EngineApiError::Internal(Box::new(err)))
1058    }
1059
1060    /// Metered version of `get_blobs_v2`.
1061    pub fn get_blobs_v2_metered(
1062        &self,
1063        versioned_hashes: Vec<B256>,
1064    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
1065        let hashes_len = versioned_hashes.len();
1066        let start = Instant::now();
1067        let res = Self::get_blobs_v2(self, versioned_hashes);
1068        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
1069
1070        if let Ok(blobs) = &res {
1071            let blobs_found = blobs.iter().flatten().count();
1072
1073            self.inner
1074                .metrics
1075                .blob_metrics
1076                .get_blobs_requests_blobs_total
1077                .increment(hashes_len as u64);
1078            self.inner
1079                .metrics
1080                .blob_metrics
1081                .get_blobs_requests_blobs_in_blobpool_total
1082                .increment(blobs_found as u64);
1083
1084            if blobs_found == hashes_len {
1085                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
1086            } else {
1087                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1088            }
1089        } else {
1090            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1091        }
1092
1093        res
1094    }
1095
1096    /// Metered version of `get_blobs_v3`.
1097    pub fn get_blobs_v3_metered(
1098        &self,
1099        versioned_hashes: Vec<B256>,
1100    ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1101        let hashes_len = versioned_hashes.len();
1102        let start = Instant::now();
1103        let res = Self::get_blobs_v3(self, versioned_hashes);
1104        self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
1105
1106        if let Ok(Some(blobs)) = &res {
1107            let blobs_found = blobs.iter().flatten().count();
1108            let blobs_missed = hashes_len - blobs_found;
1109
1110            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1111            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1112        }
1113
1114        res
1115    }
1116
1117    /// Metered version of `get_blobs_v4`.
1118    pub fn get_blobs_v4_metered(
1119        &self,
1120        versioned_hashes: Vec<B256>,
1121        indices_bitarray: B128,
1122    ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1123        let hashes_len = versioned_hashes.len();
1124        let start = Instant::now();
1125        let res = Self::get_blobs_v4(self, versioned_hashes, indices_bitarray);
1126        self.inner.metrics.latency.get_blobs_v4.record(start.elapsed());
1127
1128        if let Ok(Some(blobs)) = &res {
1129            let blobs_found = blobs.iter().flatten().count();
1130            let blobs_missed = hashes_len - blobs_found;
1131
1132            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1133            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1134        }
1135
1136        res
1137    }
1138}
1139
1140// This is the concrete ethereum engine API implementation.
1141#[async_trait]
1142impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
1143    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1144where
1145    Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
1146    EngineT: EngineTypes<ExecutionData = ExecutionData>,
1147    Pool: TransactionPool + 'static,
1148    Validator: EngineApiValidator<EngineT>,
1149    ChainSpec: EthereumHardforks + Send + Sync + 'static,
1150{
1151    /// Handler for `engine_newPayloadV1`
1152    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
1153    /// Caution: This should not accept the `withdrawals` field
1154    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
1155        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
1156        let payload =
1157            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
1158        Ok(self.new_payload_v1_metered(payload).await?)
1159    }
1160
1161    /// Handler for `engine_newPayloadV2`
1162    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
1163    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
1164        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
1165        let payload = ExecutionData {
1166            payload: payload.into_payload(),
1167            sidecar: ExecutionPayloadSidecar::none(),
1168        };
1169
1170        Ok(self.new_payload_v2_metered(payload).await?)
1171    }
1172
1173    /// Handler for `engine_newPayloadV3`
1174    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
1175    async fn new_payload_v3(
1176        &self,
1177        payload: ExecutionPayloadV3,
1178        versioned_hashes: Vec<B256>,
1179        parent_beacon_block_root: B256,
1180    ) -> RpcResult<PayloadStatus> {
1181        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
1182        let payload = ExecutionData {
1183            payload: payload.into(),
1184            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
1185                versioned_hashes,
1186                parent_beacon_block_root,
1187            }),
1188        };
1189
1190        Ok(self.new_payload_v3_metered(payload).await?)
1191    }
1192
1193    /// Handler for `engine_newPayloadV4`
1194    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
1195    async fn new_payload_v4(
1196        &self,
1197        payload: ExecutionPayloadV3,
1198        versioned_hashes: Vec<B256>,
1199        parent_beacon_block_root: B256,
1200        requests: RequestsOrHash,
1201    ) -> RpcResult<PayloadStatus> {
1202        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1203
1204        // Accept requests as a hash only if it is explicitly allowed
1205        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1206            return Err(EngineApiError::UnexpectedRequestsHash.into());
1207        }
1208
1209        let payload = ExecutionData {
1210            payload: payload.into(),
1211            sidecar: ExecutionPayloadSidecar::v4(
1212                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1213                PraguePayloadFields { requests },
1214            ),
1215        };
1216
1217        Ok(self.new_payload_v4_metered(payload).await?)
1218    }
1219
1220    /// Handler for `engine_newPayloadV5`
1221    ///
1222    /// Post Amsterdam payload handler.
1223    ///
1224    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_newpayloadv5>
1225    async fn new_payload_v5(
1226        &self,
1227        payload: ExecutionPayloadV4,
1228        versioned_hashes: Vec<B256>,
1229        parent_beacon_block_root: B256,
1230        requests: RequestsOrHash,
1231    ) -> RpcResult<PayloadStatus> {
1232        trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1233        // Accept requests as a hash only if it is explicitly allowed.
1234        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1235            return Err(EngineApiError::UnexpectedRequestsHash.into());
1236        }
1237
1238        let payload = ExecutionData {
1239            payload: payload.into(),
1240            sidecar: ExecutionPayloadSidecar::v4(
1241                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1242                PraguePayloadFields { requests },
1243            ),
1244        };
1245
1246        Ok(self.new_payload_v5_metered(payload).await?)
1247    }
1248
1249    /// Handler for `engine_forkchoiceUpdatedV1`
1250    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
1251    ///
1252    /// Caution: This should not accept the `withdrawals` field
1253    async fn fork_choice_updated_v1(
1254        &self,
1255        fork_choice_state: ForkchoiceState,
1256        payload_attributes: Option<EngineT::PayloadAttributes>,
1257    ) -> RpcResult<ForkchoiceUpdated> {
1258        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1259        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1260    }
1261
1262    /// Handler for `engine_forkchoiceUpdatedV2`
1263    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
1264    async fn fork_choice_updated_v2(
1265        &self,
1266        fork_choice_state: ForkchoiceState,
1267        payload_attributes: Option<EngineT::PayloadAttributes>,
1268    ) -> RpcResult<ForkchoiceUpdated> {
1269        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1270        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1271    }
1272
1273    /// Handler for `engine_forkchoiceUpdatedV3`
1274    ///
1275    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
1276    async fn fork_choice_updated_v3(
1277        &self,
1278        fork_choice_state: ForkchoiceState,
1279        payload_attributes: Option<EngineT::PayloadAttributes>,
1280    ) -> RpcResult<ForkchoiceUpdated> {
1281        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1282        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1283    }
1284
1285    /// Handler for `engine_forkchoiceUpdatedV4`
1286    ///
1287    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_forkchoiceupdatedv4>
1288    async fn fork_choice_updated_v4(
1289        &self,
1290        fork_choice_state: ForkchoiceState,
1291        payload_attributes: Option<EngineT::PayloadAttributes>,
1292        custody_columns: Option<B128>,
1293    ) -> RpcResult<ForkchoiceUpdated> {
1294        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV4");
1295        Ok(self
1296            .fork_choice_updated_v4_metered(fork_choice_state, payload_attributes, custody_columns)
1297            .await?)
1298    }
1299
1300    /// Handler for `engine_getPayloadV1`
1301    ///
1302    /// Returns the most recent version of the payload that is available in the corresponding
1303    /// payload build process at the time of receiving this call.
1304    ///
1305    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
1306    ///
1307    /// Caution: This should not return the `withdrawals` field
1308    ///
1309    /// Note:
1310    /// > Provider software MAY stop the corresponding build process after serving this call.
1311    async fn get_payload_v1(
1312        &self,
1313        payload_id: PayloadId,
1314    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1315        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1316        Ok(self.get_payload_v1_metered(payload_id).await?)
1317    }
1318
1319    /// Handler for `engine_getPayloadV2`
1320    ///
1321    /// Returns the most recent version of the payload that is available in the corresponding
1322    /// payload build process at the time of receiving this call.
1323    ///
1324    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
1325    ///
1326    /// Note:
1327    /// > Provider software MAY stop the corresponding build process after serving this call.
1328    async fn get_payload_v2(
1329        &self,
1330        payload_id: PayloadId,
1331    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1332        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1333        Ok(self.get_payload_v2_metered(payload_id).await?)
1334    }
1335
1336    /// Handler for `engine_getPayloadV3`
1337    ///
1338    /// Returns the most recent version of the payload that is available in the corresponding
1339    /// payload build process at the time of receiving this call.
1340    ///
1341    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
1342    ///
1343    /// Note:
1344    /// > Provider software MAY stop the corresponding build process after serving this call.
1345    async fn get_payload_v3(
1346        &self,
1347        payload_id: PayloadId,
1348    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1349        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1350        Ok(self.get_payload_v3_metered(payload_id).await?)
1351    }
1352
1353    /// Handler for `engine_getPayloadV4`
1354    ///
1355    /// Returns the most recent version of the payload that is available in the corresponding
1356    /// payload build process at the time of receiving this call.
1357    ///
1358    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1359    ///
1360    /// Note:
1361    /// > Provider software MAY stop the corresponding build process after serving this call.
1362    async fn get_payload_v4(
1363        &self,
1364        payload_id: PayloadId,
1365    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1366        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1367        Ok(self.get_payload_v4_metered(payload_id).await?)
1368    }
1369
1370    /// Handler for `engine_getPayloadV5`
1371    ///
1372    /// Returns the most recent version of the payload that is available in the corresponding
1373    /// payload build process at the time of receiving this call.
1374    ///
1375    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1376    ///
1377    /// Note:
1378    /// > Provider software MAY stop the corresponding build process after serving this call.
1379    async fn get_payload_v5(
1380        &self,
1381        payload_id: PayloadId,
1382    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1383        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1384        Ok(self.get_payload_v5_metered(payload_id).await?)
1385    }
1386
1387    /// Handler for `engine_getPayloadV6`
1388    ///
1389    /// Post-Amsterdam payload handler that includes Block Access Lists (BAL).
1390    ///
1391    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_getpayloadv6>
1392    async fn get_payload_v6(
1393        &self,
1394        payload_id: PayloadId,
1395    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1396        trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1397        Ok(self.get_payload_v6_metered(payload_id).await?)
1398    }
1399
1400    /// Handler for `engine_getPayloadBodiesByHashV1`
1401    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1402    async fn get_payload_bodies_by_hash_v1(
1403        &self,
1404        block_hashes: Vec<BlockHash>,
1405    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1406        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1407        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1408    }
1409
1410    /// Handler for `engine_getPayloadBodiesByHashV2`
1411    ///
1412    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
1413    ///
1414    /// See also <https://eips.ethereum.org/EIPS/eip-7928>
1415    async fn get_payload_bodies_by_hash_v2(
1416        &self,
1417        block_hashes: Vec<BlockHash>,
1418    ) -> RpcResult<ExecutionPayloadBodiesV2> {
1419        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1420        Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1421    }
1422
1423    /// Handler for `engine_getPayloadBodiesByRangeV1`
1424    ///
1425    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1426    ///
1427    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1428    /// blocks.
1429    ///
1430    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1431    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1432    /// adversarial.
1433    ///
1434    /// Implementers should take care when acting on the input to this method, specifically
1435    /// ensuring that the range is limited properly, and that the range boundaries are computed
1436    /// correctly and without panics.
1437    ///
1438    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1439    async fn get_payload_bodies_by_range_v1(
1440        &self,
1441        start: U64,
1442        count: U64,
1443    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1444        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1445        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1446    }
1447
1448    /// Handler for `engine_getPayloadBodiesByRangeV2`
1449    ///
1450    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
1451    ///
1452    /// See also <https://eips.ethereum.org/EIPS/eip-7928>
1453    async fn get_payload_bodies_by_range_v2(
1454        &self,
1455        start: U64,
1456        count: U64,
1457    ) -> RpcResult<ExecutionPayloadBodiesV2> {
1458        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1459        Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1460    }
1461
1462    /// Handler for `engine_getClientVersionV1`
1463    ///
1464    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1465    async fn get_client_version_v1(
1466        &self,
1467        client: ClientVersionV1,
1468    ) -> RpcResult<Vec<ClientVersionV1>> {
1469        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1470        Ok(Self::get_client_version_v1(self, client)?)
1471    }
1472
1473    /// Handler for `engine_exchangeCapabilitiesV1`
1474    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1475    async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1476        trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1477
1478        let el_caps = self.capabilities();
1479        el_caps.log_capability_mismatches(&capabilities);
1480
1481        Ok(el_caps.list())
1482    }
1483
1484    async fn get_blobs_v1(
1485        &self,
1486        versioned_hashes: Vec<B256>,
1487    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1488        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1489        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1490    }
1491
1492    async fn get_blobs_v2(
1493        &self,
1494        versioned_hashes: Vec<B256>,
1495    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1496        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1497        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1498    }
1499
1500    async fn get_blobs_v3(
1501        &self,
1502        versioned_hashes: Vec<B256>,
1503    ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1504        trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1505        Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1506    }
1507
1508    async fn get_blobs_v4(
1509        &self,
1510        versioned_hashes: Vec<B256>,
1511        indices_bitarray: B128,
1512    ) -> RpcResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1513        trace!(target: "rpc::engine", "Serving engine_getBlobsV4");
1514        Ok(self.get_blobs_v4_metered(versioned_hashes, indices_bitarray)?)
1515    }
1516}
1517
1518impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1519    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1520where
1521    EngineT: EngineTypes,
1522    Self: EngineApiServer<EngineT>,
1523{
1524    fn into_rpc_module(self) -> RpcModule<()> {
1525        EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1526    }
1527}
1528
1529impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1530    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1531where
1532    PayloadT: PayloadTypes,
1533{
1534    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1535        f.debug_struct("EngineApi").finish_non_exhaustive()
1536    }
1537}
1538
1539impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1540    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1541where
1542    PayloadT: PayloadTypes,
1543{
1544    fn clone(&self) -> Self {
1545        Self { inner: Arc::clone(&self.inner) }
1546    }
1547}
1548
1549/// The container type for the engine API internals.
1550struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1551    /// The provider to interact with the chain.
1552    provider: Provider,
1553    /// Consensus configuration
1554    chain_spec: Arc<ChainSpec>,
1555    /// The channel to send messages to the beacon consensus engine.
1556    beacon_consensus: ConsensusEngineHandle<PayloadT>,
1557    /// The type that can communicate with the payload service to retrieve payloads.
1558    payload_store: PayloadStore<PayloadT>,
1559    /// For spawning and executing async tasks
1560    task_spawner: Runtime,
1561    /// The latency and response type metrics for engine api calls
1562    metrics: EngineApiMetrics,
1563    /// Identification of the execution client used by the consensus client
1564    client: ClientVersionV1,
1565    /// The list of all supported Engine capabilities available over the engine endpoint.
1566    capabilities: EngineCapabilities,
1567    /// Transaction pool.
1568    tx_pool: Pool,
1569    /// Engine validator.
1570    validator: Validator,
1571    accept_execution_requests_hash: bool,
1572    /// Shared blob cell custody bitmap.
1573    cell_custody: CellCustody,
1574    /// Returns `true` if the node is currently syncing.
1575    is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1576}
1577
1578#[cfg(test)]
1579mod tests {
1580    use super::*;
1581    use alloy_eips::{eip7685::Requests, NumHash};
1582    use alloy_primitives::{Address, Bytes, B256};
1583    use alloy_rpc_types_engine::{
1584        ClientCode, ClientVersionV1, ExecutionPayloadV2, PayloadAttributes, PayloadStatusEnum,
1585    };
1586    use assert_matches::assert_matches;
1587    use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1588    use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1589    use reth_ethereum_engine_primitives::EthEngineTypes;
1590    use reth_ethereum_primitives::Block;
1591    use reth_network_api::{
1592        noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1593    };
1594    use reth_node_ethereum::EthereumEngineValidator;
1595    use reth_payload_builder::test_utils::spawn_test_payload_service;
1596    use reth_provider::{test_utils::MockEthProvider, BalStoreHandle, InMemoryBalStore, RawBal};
1597    use reth_tasks::Runtime;
1598    use reth_transaction_pool::noop::NoopTransactionPool;
1599    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1600
1601    fn setup_engine_api() -> (
1602        EngineApiTestHandle,
1603        EngineApi<
1604            Arc<MockEthProvider>,
1605            EthEngineTypes,
1606            NoopTransactionPool,
1607            EthereumEngineValidator,
1608            ChainSpec,
1609        >,
1610    ) {
1611        let client = ClientVersionV1 {
1612            code: ClientCode::RH,
1613            name: "Reth".to_string(),
1614            version: "v0.2.0-beta.5".to_string(),
1615            commit: "defa64b2".to_string(),
1616        };
1617
1618        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1619        let provider = Arc::new(MockEthProvider::default());
1620        let payload_store = spawn_test_payload_service();
1621        let (to_engine, engine_rx) = unbounded_channel();
1622        let task_executor = Runtime::test();
1623        let api = EngineApi::new(
1624            provider.clone(),
1625            chain_spec.clone(),
1626            ConsensusEngineHandle::new(to_engine),
1627            payload_store.into(),
1628            NoopTransactionPool::default(),
1629            task_executor,
1630            client,
1631            EngineCapabilities::default(),
1632            EthereumEngineValidator::new(chain_spec.clone()),
1633            false,
1634            NoopNetwork::default(),
1635        );
1636        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1637        (handle, api)
1638    }
1639
1640    #[tokio::test]
1641    async fn engine_client_version_v1() {
1642        let client = ClientVersionV1 {
1643            code: ClientCode::RH,
1644            name: "Reth".to_string(),
1645            version: "v0.2.0-beta.5".to_string(),
1646            commit: "defa64b2".to_string(),
1647        };
1648        let (_, api) = setup_engine_api();
1649        let res = api.get_client_version_v1(client.clone());
1650        assert_eq!(res.unwrap(), vec![client]);
1651    }
1652
1653    #[tokio::test]
1654    async fn get_payload_bodies_by_hash_v2_returns_block_access_list_from_store() {
1655        let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1656        let mut provider = MockEthProvider::default();
1657        provider.bal_store = bal_store.clone();
1658        let provider = Arc::new(provider);
1659
1660        let client = ClientVersionV1 {
1661            code: ClientCode::RH,
1662            name: "Reth".to_string(),
1663            version: "v0.2.0-beta.5".to_string(),
1664            commit: "defa64b2".to_string(),
1665        };
1666        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1667        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1668        let (to_engine, _engine_rx) = unbounded_channel();
1669        let api = EngineApi::new(
1670            provider.clone(),
1671            chain_spec.clone(),
1672            ConsensusEngineHandle::new(to_engine),
1673            payload_store.into(),
1674            NoopTransactionPool::default(),
1675            Runtime::test(),
1676            client,
1677            EngineCapabilities::default(),
1678            EthereumEngineValidator::new(chain_spec),
1679            false,
1680            NoopNetwork::default(),
1681        );
1682
1683        let mut block = Block::default();
1684        block.header.number = 1;
1685        let block_hash = block.header.hash_slow();
1686        provider.add_block(block_hash, block);
1687
1688        let mut block_without_bal = Block::default();
1689        block_without_bal.header.number = 2;
1690        let block_without_bal_hash = block_without_bal.header.hash_slow();
1691        provider.add_block(block_without_bal_hash, block_without_bal);
1692
1693        let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1694        bal_store.insert(NumHash::new(1, block_hash), RawBal::new(raw_bal.clone())).unwrap();
1695
1696        let missing_hash = B256::with_last_byte(3);
1697        let response = api
1698            .get_payload_bodies_by_hash_v2(vec![block_hash, block_without_bal_hash, missing_hash])
1699            .await
1700            .unwrap();
1701
1702        assert_eq!(response.len(), 3);
1703        assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1704        assert_eq!(response[1].as_ref().unwrap().block_access_list, None);
1705        assert!(response[2].is_none());
1706    }
1707
1708    #[tokio::test]
1709    async fn get_payload_bodies_by_range_v2_returns_block_access_lists_from_store() {
1710        let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1711        let mut provider = MockEthProvider::default();
1712        provider.bal_store = bal_store.clone();
1713        let provider = Arc::new(provider);
1714
1715        let client = ClientVersionV1 {
1716            code: ClientCode::RH,
1717            name: "Reth".to_string(),
1718            version: "v0.2.0-beta.5".to_string(),
1719            commit: "defa64b2".to_string(),
1720        };
1721        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1722        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1723        let (to_engine, _engine_rx) = unbounded_channel();
1724        let api = EngineApi::new(
1725            provider.clone(),
1726            chain_spec.clone(),
1727            ConsensusEngineHandle::new(to_engine),
1728            payload_store.into(),
1729            NoopTransactionPool::default(),
1730            Runtime::test(),
1731            client,
1732            EngineCapabilities::default(),
1733            EthereumEngineValidator::new(chain_spec),
1734            false,
1735            NoopNetwork::default(),
1736        );
1737
1738        let mut block = Block::default();
1739        block.header.number = 1;
1740        let block_hash = block.header.hash_slow();
1741        provider.add_block(block_hash, block);
1742
1743        let mut block_without_bal = Block::default();
1744        block_without_bal.header.number = 3;
1745        let block_without_bal_hash = block_without_bal.header.hash_slow();
1746        provider.add_block(block_without_bal_hash, block_without_bal);
1747
1748        let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1749        bal_store.insert(NumHash::new(1, block_hash), RawBal::new(raw_bal.clone())).unwrap();
1750
1751        let response = api.get_payload_bodies_by_range_v2(1, 3).await.unwrap();
1752
1753        assert_eq!(response.len(), 3);
1754        assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1755        assert!(response[1].is_none());
1756        assert_eq!(response[2].as_ref().unwrap().block_access_list, None);
1757    }
1758
1759    struct EngineApiTestHandle {
1760        #[allow(dead_code)]
1761        chain_spec: Arc<ChainSpec>,
1762        provider: Arc<MockEthProvider>,
1763        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1764    }
1765
1766    #[tokio::test]
1767    async fn forwards_responses_to_consensus_engine() {
1768        let (mut handle, api) = setup_engine_api();
1769
1770        tokio::spawn(async move {
1771            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1772            let execution_data = ExecutionData {
1773                payload: payload_v1.into(),
1774                sidecar: ExecutionPayloadSidecar::none(),
1775            };
1776
1777            api.new_payload_v1(execution_data).await.unwrap();
1778        });
1779        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1780    }
1781
1782    #[tokio::test]
1783    async fn new_payload_v5_accepts_amsterdam_payloads() {
1784        let chain_spec = Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1785        let provider = Arc::new(MockEthProvider::default());
1786        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1787        let (to_engine, mut engine_rx) = unbounded_channel();
1788
1789        let api = EngineApi::new(
1790            provider,
1791            chain_spec.clone(),
1792            ConsensusEngineHandle::new(to_engine),
1793            payload_store.into(),
1794            NoopTransactionPool::default(),
1795            Runtime::test(),
1796            ClientVersionV1 {
1797                code: ClientCode::RH,
1798                name: "Reth".to_string(),
1799                version: "v0.0.0-test".to_string(),
1800                commit: "test".to_string(),
1801            },
1802            EngineCapabilities::default(),
1803            EthereumEngineValidator::new(chain_spec),
1804            false,
1805            NoopNetwork::default(),
1806        );
1807
1808        tokio::spawn(async move {
1809            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1810            let payload = ExecutionPayloadV4 {
1811                payload_inner: ExecutionPayloadV3 {
1812                    payload_inner: ExecutionPayloadV2 {
1813                        payload_inner: payload_v1,
1814                        withdrawals: Vec::new(),
1815                    },
1816                    blob_gas_used: 0,
1817                    excess_blob_gas: 0,
1818                },
1819                block_access_list: Bytes::from_static(b"bal"),
1820                slot_number: 1,
1821            };
1822            let execution_data = ExecutionData {
1823                payload: payload.into(),
1824                sidecar: ExecutionPayloadSidecar::v4(
1825                    CancunPayloadFields {
1826                        versioned_hashes: Vec::new(),
1827                        parent_beacon_block_root: B256::ZERO,
1828                    },
1829                    PraguePayloadFields { requests: RequestsOrHash::Requests(Requests::default()) },
1830                ),
1831            };
1832
1833            api.new_payload_v5(execution_data).await.unwrap();
1834        });
1835
1836        assert_matches!(engine_rx.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1837    }
1838
1839    #[derive(Clone)]
1840    struct TestNetworkInfo {
1841        syncing: bool,
1842    }
1843
1844    impl NetworkInfo for TestNetworkInfo {
1845        fn local_addr(&self) -> std::net::SocketAddr {
1846            (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1847        }
1848
1849        async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1850            #[allow(deprecated)]
1851            Ok(NetworkStatus {
1852                client_version: "test".to_string(),
1853                protocol_version: 5,
1854                eth_protocol_info: EthProtocolInfo {
1855                    network: 1,
1856                    difficulty: None,
1857                    genesis: Default::default(),
1858                    config: Default::default(),
1859                    head: Default::default(),
1860                },
1861                capabilities: vec![],
1862            })
1863        }
1864
1865        fn chain_id(&self) -> u64 {
1866            1
1867        }
1868
1869        fn cell_custody(&self) -> &CellCustody {
1870            static CELL_CUSTODY: std::sync::OnceLock<CellCustody> = std::sync::OnceLock::new();
1871            CELL_CUSTODY.get_or_init(CellCustody::default)
1872        }
1873
1874        fn is_syncing(&self) -> bool {
1875            self.syncing
1876        }
1877
1878        fn is_initially_syncing(&self) -> bool {
1879            self.syncing
1880        }
1881    }
1882
1883    #[tokio::test]
1884    async fn get_blobs_v3_returns_null_when_syncing() {
1885        let chain_spec: Arc<ChainSpec> =
1886            Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1887        let provider = Arc::new(MockEthProvider::default());
1888        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1889        let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1890
1891        let api = EngineApi::new(
1892            provider,
1893            chain_spec.clone(),
1894            ConsensusEngineHandle::new(to_engine),
1895            payload_store.into(),
1896            NoopTransactionPool::default(),
1897            Runtime::test(),
1898            ClientVersionV1 {
1899                code: ClientCode::RH,
1900                name: "Reth".to_string(),
1901                version: "v0.0.0-test".to_string(),
1902                commit: "test".to_string(),
1903            },
1904            EngineCapabilities::default(),
1905            EthereumEngineValidator::new(chain_spec),
1906            false,
1907            TestNetworkInfo { syncing: true },
1908        );
1909
1910        let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1911        assert_matches!(res, Ok(None));
1912    }
1913
1914    #[tokio::test]
1915    async fn get_blobs_v4_returns_null_when_syncing() {
1916        let chain_spec: Arc<ChainSpec> =
1917            Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1918        let provider = Arc::new(MockEthProvider::default());
1919        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1920        let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1921
1922        let api = EngineApi::new(
1923            provider,
1924            chain_spec.clone(),
1925            ConsensusEngineHandle::new(to_engine),
1926            payload_store.into(),
1927            NoopTransactionPool::default(),
1928            Runtime::test(),
1929            ClientVersionV1 {
1930                code: ClientCode::RH,
1931                name: "Reth".to_string(),
1932                version: "v0.0.0-test".to_string(),
1933                commit: "test".to_string(),
1934            },
1935            EngineCapabilities::default(),
1936            EthereumEngineValidator::new(chain_spec),
1937            false,
1938            TestNetworkInfo { syncing: true },
1939        );
1940
1941        let res = api.get_blobs_v4_metered(vec![B256::ZERO], B128::from(1u128));
1942        assert_matches!(res, Ok(None));
1943    }
1944
1945    #[tokio::test]
1946    async fn fcu_v4_updates_shared_cell_custody_before_forkchoice_result() {
1947        let chain_spec: Arc<ChainSpec> =
1948            Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1949        let provider = Arc::new(MockEthProvider::default());
1950        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1951        let (to_engine, mut engine_rx) = unbounded_channel();
1952        let network = NoopNetwork::default();
1953        let cell_custody = network.cell_custody().clone();
1954
1955        let api = EngineApi::new(
1956            provider,
1957            chain_spec.clone(),
1958            ConsensusEngineHandle::new(to_engine),
1959            payload_store.into(),
1960            NoopTransactionPool::default(),
1961            Runtime::test(),
1962            ClientVersionV1 {
1963                code: ClientCode::RH,
1964                name: "Reth".to_string(),
1965                version: "v0.0.0-test".to_string(),
1966                commit: "test".to_string(),
1967            },
1968            EngineCapabilities::default(),
1969            EthereumEngineValidator::new(chain_spec),
1970            false,
1971            network,
1972        );
1973
1974        let state = ForkchoiceState {
1975            head_block_hash: B256::from([0x33; 32]),
1976            safe_block_hash: B256::ZERO,
1977            finalized_block_hash: B256::ZERO,
1978        };
1979        let custody_columns = B128::from(0b1010u128);
1980
1981        let api_task = tokio::spawn(async move {
1982            api.fork_choice_updated_v4(state, None, Some(custody_columns)).await
1983        });
1984
1985        let request = tokio::time::timeout(std::time::Duration::from_secs(1), engine_rx.recv())
1986            .await
1987            .expect("timed out waiting for forkchoiceUpdated request")
1988            .expect("expected forkchoiceUpdated request");
1989        let response_tx = match request {
1990            BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1991                assert!(payload_attrs.is_none());
1992                tx
1993            }
1994            other => panic!("unexpected engine message: {other:?}"),
1995        };
1996        assert_eq!(cell_custody.get(), custody_columns);
1997
1998        response_tx
1999            .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2000                PayloadStatusEnum::Valid,
2001            ))))
2002            .expect("send valid response");
2003
2004        api_task
2005            .await
2006            .expect("api task should not panic")
2007            .expect("forkchoiceUpdatedV4 should succeed");
2008        assert_eq!(cell_custody.get(), custody_columns);
2009    }
2010
2011    #[tokio::test]
2012    async fn fcu_v4_updates_shared_cell_custody_when_payload_attrs_invalid() {
2013        let chain_spec: Arc<ChainSpec> =
2014            Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
2015        let provider = Arc::new(MockEthProvider::default());
2016        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
2017        let (to_engine, mut engine_rx) = unbounded_channel();
2018        let network = NoopNetwork::default();
2019        let cell_custody = network.cell_custody().clone();
2020
2021        let api = EngineApi::new(
2022            provider,
2023            chain_spec.clone(),
2024            ConsensusEngineHandle::new(to_engine),
2025            payload_store.into(),
2026            NoopTransactionPool::default(),
2027            Runtime::test(),
2028            ClientVersionV1 {
2029                code: ClientCode::RH,
2030                name: "Reth".to_string(),
2031                version: "v0.0.0-test".to_string(),
2032                commit: "test".to_string(),
2033            },
2034            EngineCapabilities::default(),
2035            EthereumEngineValidator::new(chain_spec),
2036            false,
2037            network,
2038        );
2039
2040        let state = ForkchoiceState {
2041            head_block_hash: B256::from([0x44; 32]),
2042            safe_block_hash: B256::ZERO,
2043            finalized_block_hash: B256::ZERO,
2044        };
2045        let payload_attributes = PayloadAttributes {
2046            timestamp: 1,
2047            prev_randao: B256::ZERO,
2048            suggested_fee_recipient: Address::ZERO,
2049            withdrawals: Some(vec![]),
2050            parent_beacon_block_root: None,
2051            slot_number: None,
2052            target_gas_limit: None,
2053        };
2054        let custody_columns = B128::from(0b1010u128);
2055
2056        let api_task = tokio::spawn(async move {
2057            api.fork_choice_updated_v4(state, Some(payload_attributes), Some(custody_columns)).await
2058        });
2059
2060        let request = tokio::time::timeout(std::time::Duration::from_secs(1), engine_rx.recv())
2061            .await
2062            .expect("timed out waiting for forkchoiceUpdated request")
2063            .expect("expected forkchoiceUpdated request");
2064        let response_tx = match request {
2065            BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2066                assert!(
2067                    payload_attrs.is_none(),
2068                    "when attrs are invalid, API should first evaluate forkchoice without attrs"
2069                );
2070                tx
2071            }
2072            other => panic!("unexpected engine message: {other:?}"),
2073        };
2074        assert_eq!(cell_custody.get(), custody_columns);
2075
2076        response_tx
2077            .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2078                PayloadStatusEnum::Valid,
2079            ))))
2080            .expect("send valid response");
2081
2082        let response = api_task.await.expect("api task should not panic");
2083        assert_matches!(
2084            response,
2085            Err(EngineApiError::EngineObjectValidationError(
2086                reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
2087            ))
2088        );
2089    }
2090
2091    #[tokio::test]
2092    async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
2093        let (mut handle, api) = setup_engine_api();
2094
2095        let state = ForkchoiceState {
2096            head_block_hash: B256::from([0x11; 32]),
2097            safe_block_hash: B256::ZERO,
2098            finalized_block_hash: B256::ZERO,
2099        };
2100        let payload_attributes = PayloadAttributes {
2101            timestamp: 1,
2102            prev_randao: B256::ZERO,
2103            suggested_fee_recipient: Address::ZERO,
2104            withdrawals: Some(vec![]),
2105            // Invalid for V3/Cancun, but should be ignored if forkchoice is SYNCING.
2106            parent_beacon_block_root: None,
2107            slot_number: None,
2108            target_gas_limit: None,
2109        };
2110
2111        let api_task = tokio::spawn(async move {
2112            api.fork_choice_updated_v3(state, Some(payload_attributes)).await
2113        });
2114
2115        let request =
2116            tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
2117                .await
2118                .expect("timed out waiting for forkchoiceUpdated request")
2119                .expect("expected forkchoiceUpdated request");
2120        let response_tx = match request {
2121            BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2122                assert!(
2123                    payload_attrs.is_none(),
2124                    "FCU for syncing state should be evaluated before payload attributes"
2125                );
2126                tx
2127            }
2128            other => panic!("unexpected engine message: {other:?}"),
2129        };
2130
2131        response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
2132
2133        let response = api_task
2134            .await
2135            .expect("api task should not panic")
2136            .expect("forkchoiceUpdatedV3 should return a syncing response");
2137        assert!(response.payload_status.is_syncing());
2138        assert!(response.payload_id.is_none());
2139    }
2140
2141    #[tokio::test]
2142    async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
2143        let (mut handle, api) = setup_engine_api();
2144
2145        let state = ForkchoiceState {
2146            head_block_hash: B256::from([0x22; 32]),
2147            safe_block_hash: B256::ZERO,
2148            finalized_block_hash: B256::ZERO,
2149        };
2150        let payload_attributes = PayloadAttributes {
2151            timestamp: 1,
2152            prev_randao: B256::ZERO,
2153            suggested_fee_recipient: Address::ZERO,
2154            withdrawals: Some(vec![]),
2155            parent_beacon_block_root: None,
2156            slot_number: None,
2157            target_gas_limit: None,
2158        };
2159
2160        let api_task = tokio::spawn(async move {
2161            api.fork_choice_updated_v3(state, Some(payload_attributes)).await
2162        });
2163
2164        let request =
2165            tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
2166                .await
2167                .expect("timed out waiting for forkchoiceUpdated request")
2168                .expect("expected forkchoiceUpdated request");
2169
2170        let response_tx = match request {
2171            BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2172                assert!(
2173                    payload_attrs.is_none(),
2174                    "when attrs are invalid, API should first evaluate forkchoice without attrs"
2175                );
2176                tx
2177            }
2178            other => panic!("unexpected engine message: {other:?}"),
2179        };
2180
2181        response_tx
2182            .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2183                PayloadStatusEnum::Valid,
2184            ))))
2185            .expect("send valid response");
2186
2187        let response = api_task.await.expect("api task should not panic");
2188        assert_matches!(
2189            response,
2190            Err(EngineApiError::EngineObjectValidationError(
2191                reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
2192            ))
2193        );
2194
2195        match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
2196            .await
2197        {
2198            Err(_) | Ok(None) => {}
2199            Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
2200                panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
2201            }
2202            Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
2203        }
2204    }
2205
2206    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
2207    mod get_payload_bodies {
2208        use super::*;
2209        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
2210        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
2211
2212        #[tokio::test]
2213        async fn invalid_params() {
2214            let (_, api) = setup_engine_api();
2215
2216            let by_range_tests = [
2217                // (start, count)
2218                (0, 0),
2219                (0, 1),
2220                (1, 0),
2221            ];
2222
2223            // test [EngineApiMessage::GetPayloadBodiesByRange]
2224            for (start, count) in by_range_tests {
2225                let res = api.get_payload_bodies_by_range_v1(start, count).await;
2226                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
2227            }
2228        }
2229
2230        #[tokio::test]
2231        async fn request_too_large() {
2232            let (_, api) = setup_engine_api();
2233
2234            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
2235            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
2236            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
2237        }
2238
2239        #[tokio::test]
2240        async fn returns_payload_bodies() {
2241            let mut rng = generators::rng();
2242            let (handle, api) = setup_engine_api();
2243
2244            let (start, count) = (1, 10);
2245            let blocks = random_block_range(
2246                &mut rng,
2247                start..=start + count - 1,
2248                BlockRangeParams { tx_count: 0..2, ..Default::default() },
2249            );
2250            handle
2251                .provider
2252                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
2253
2254            let expected = blocks
2255                .iter()
2256                .cloned()
2257                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
2258                .collect::<Vec<_>>();
2259
2260            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2261            assert_eq!(res, expected);
2262        }
2263
2264        #[tokio::test]
2265        async fn returns_payload_bodies_with_gaps() {
2266            let mut rng = generators::rng();
2267            let (handle, api) = setup_engine_api();
2268
2269            let (start, count) = (1, 100);
2270            let blocks = random_block_range(
2271                &mut rng,
2272                start..=start + count - 1,
2273                BlockRangeParams { tx_count: 0..2, ..Default::default() },
2274            );
2275
2276            // Insert only blocks in ranges 1-25 and 50-75
2277            let first_missing_range = 26..=50;
2278            let second_missing_range = 76..=100;
2279            handle.provider.extend_blocks(
2280                blocks
2281                    .iter()
2282                    .filter(|b| {
2283                        !first_missing_range.contains(&b.number) &&
2284                            !second_missing_range.contains(&b.number)
2285                    })
2286                    .map(|b| (b.hash(), b.clone().into_block())),
2287            );
2288
2289            let expected = blocks
2290                .iter()
2291                // filter anything after the second missing range to ensure we don't expect trailing
2292                // `None`s
2293                .filter(|b| !second_missing_range.contains(&b.number))
2294                .cloned()
2295                .map(|b| {
2296                    if first_missing_range.contains(&b.number) {
2297                        None
2298                    } else {
2299                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2300                    }
2301                })
2302                .collect::<Vec<_>>();
2303
2304            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2305            assert_eq!(res, expected);
2306
2307            let expected = blocks
2308                .iter()
2309                .cloned()
2310                // ensure we still return trailing `None`s here because by-hash will not be aware
2311                // of the missing block's number, and cannot compare it to the current best block
2312                .map(|b| {
2313                    if first_missing_range.contains(&b.number) ||
2314                        second_missing_range.contains(&b.number)
2315                    {
2316                        None
2317                    } else {
2318                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2319                    }
2320                })
2321                .collect::<Vec<_>>();
2322
2323            let hashes = blocks.iter().map(|b| b.hash()).collect();
2324            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
2325            assert_eq!(res, expected);
2326        }
2327    }
2328}