reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes, EngineValidator};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24    validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload,
25    PayloadBuilderAttributes, PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_rpc_server_types::result::internal_rpc_err;
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::TaskSpawner;
32use reth_transaction_pool::TransactionPool;
33use std::{sync::Arc, time::Instant};
34use tokio::sync::oneshot;
35use tracing::{trace, warn};
36
37/// The Engine API response sender.
38pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
39
40/// The upper limit for payload bodies request.
41const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
42
43/// The upper limit blobs `eth_getBlobs`.
44const MAX_BLOB_LIMIT: usize = 128;
45
46/// The Engine API implementation that grants the Consensus layer access to data and
47/// functions in the Execution layer that are crucial for the consensus process.
48///
49/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
50/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
51/// are still follow the engine API model.
52///
53/// ## Implementers
54///
55/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
56/// server trait and implementing it on a type that can wrap this [`EngineApi`] type.
57/// See also [`EngineApiServer`] implementation for this type which is the L1 implementation.
58pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
59    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
60}
61
62struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
63    /// The provider to interact with the chain.
64    provider: Provider,
65    /// Consensus configuration
66    chain_spec: Arc<ChainSpec>,
67    /// The channel to send messages to the beacon consensus engine.
68    beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
69    /// The type that can communicate with the payload service to retrieve payloads.
70    payload_store: PayloadStore<PayloadT>,
71    /// For spawning and executing async tasks
72    task_spawner: Box<dyn TaskSpawner>,
73    /// The latency and response type metrics for engine api calls
74    metrics: EngineApiMetrics,
75    /// Identification of the execution client used by the consensus client
76    client: ClientVersionV1,
77    /// The list of all supported Engine capabilities available over the engine endpoint.
78    capabilities: EngineCapabilities,
79    /// Transaction pool.
80    tx_pool: Pool,
81    /// Engine validator.
82    validator: Validator,
83    /// Start time of the latest payload request
84    latest_new_payload_response: Mutex<Option<Instant>>,
85    accept_execution_requests_hash: bool,
86}
87
88impl<Provider, PayloadT, Pool, Validator, ChainSpec>
89    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
90where
91    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
92    PayloadT: PayloadTypes,
93    Pool: TransactionPool + 'static,
94    Validator: EngineValidator<PayloadT>,
95    ChainSpec: EthereumHardforks + Send + Sync + 'static,
96{
97    /// Create new instance of [`EngineApi`].
98    #[expect(clippy::too_many_arguments)]
99    pub fn new(
100        provider: Provider,
101        chain_spec: Arc<ChainSpec>,
102        beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
103        payload_store: PayloadStore<PayloadT>,
104        tx_pool: Pool,
105        task_spawner: Box<dyn TaskSpawner>,
106        client: ClientVersionV1,
107        capabilities: EngineCapabilities,
108        validator: Validator,
109        accept_execution_requests_hash: bool,
110    ) -> Self {
111        let inner = Arc::new(EngineApiInner {
112            provider,
113            chain_spec,
114            beacon_consensus,
115            payload_store,
116            task_spawner,
117            metrics: EngineApiMetrics::default(),
118            client,
119            capabilities,
120            tx_pool,
121            validator,
122            latest_new_payload_response: Mutex::new(None),
123            accept_execution_requests_hash,
124        });
125        Self { inner }
126    }
127
128    /// Fetches the client version.
129    pub fn get_client_version_v1(
130        &self,
131        _client: ClientVersionV1,
132    ) -> EngineApiResult<Vec<ClientVersionV1>> {
133        Ok(vec![self.inner.client.clone()])
134    }
135
136    /// Fetches the attributes for the payload with the given id.
137    async fn get_payload_attributes(
138        &self,
139        payload_id: PayloadId,
140    ) -> EngineApiResult<PayloadT::PayloadBuilderAttributes> {
141        Ok(self
142            .inner
143            .payload_store
144            .payload_attributes(payload_id)
145            .await
146            .ok_or(EngineApiError::UnknownPayload)??)
147    }
148
149    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
150    /// Caution: This should not accept the `withdrawals` field
151    pub async fn new_payload_v1(
152        &self,
153        payload: PayloadT::ExecutionData,
154    ) -> EngineApiResult<PayloadStatus> {
155        let payload_or_attrs = PayloadOrAttributes::<
156            '_,
157            PayloadT::ExecutionData,
158            PayloadT::PayloadAttributes,
159        >::from_execution_payload(&payload);
160
161        self.inner
162            .validator
163            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
164
165        Ok(self
166            .inner
167            .beacon_consensus
168            .new_payload(payload)
169            .await
170            .inspect(|_| self.inner.on_new_payload_response())?)
171    }
172
173    /// Metered version of `new_payload_v1`.
174    async fn new_payload_v1_metered(
175        &self,
176        payload: PayloadT::ExecutionData,
177    ) -> EngineApiResult<PayloadStatus> {
178        let start = Instant::now();
179        let gas_used = payload.gas_used();
180
181        let res = Self::new_payload_v1(self, payload).await;
182        let elapsed = start.elapsed();
183        self.inner.metrics.latency.new_payload_v1.record(elapsed);
184        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
185        res
186    }
187
188    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
189    pub async fn new_payload_v2(
190        &self,
191        payload: PayloadT::ExecutionData,
192    ) -> EngineApiResult<PayloadStatus> {
193        let payload_or_attrs = PayloadOrAttributes::<
194            '_,
195            PayloadT::ExecutionData,
196            PayloadT::PayloadAttributes,
197        >::from_execution_payload(&payload);
198        self.inner
199            .validator
200            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
201        Ok(self
202            .inner
203            .beacon_consensus
204            .new_payload(payload)
205            .await
206            .inspect(|_| self.inner.on_new_payload_response())?)
207    }
208
209    /// Metered version of `new_payload_v2`.
210    pub async fn new_payload_v2_metered(
211        &self,
212        payload: PayloadT::ExecutionData,
213    ) -> EngineApiResult<PayloadStatus> {
214        let start = Instant::now();
215        let gas_used = payload.gas_used();
216
217        let res = Self::new_payload_v2(self, payload).await;
218        let elapsed = start.elapsed();
219        self.inner.metrics.latency.new_payload_v2.record(elapsed);
220        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
221        res
222    }
223
224    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
225    pub async fn new_payload_v3(
226        &self,
227        payload: PayloadT::ExecutionData,
228    ) -> EngineApiResult<PayloadStatus> {
229        let payload_or_attrs = PayloadOrAttributes::<
230            '_,
231            PayloadT::ExecutionData,
232            PayloadT::PayloadAttributes,
233        >::from_execution_payload(&payload);
234        self.inner
235            .validator
236            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
237
238        Ok(self
239            .inner
240            .beacon_consensus
241            .new_payload(payload)
242            .await
243            .inspect(|_| self.inner.on_new_payload_response())?)
244    }
245
246    /// Metrics version of `new_payload_v3`
247    pub async fn new_payload_v3_metered(
248        &self,
249        payload: PayloadT::ExecutionData,
250    ) -> RpcResult<PayloadStatus> {
251        let start = Instant::now();
252        let gas_used = payload.gas_used();
253
254        let res = Self::new_payload_v3(self, payload).await;
255        let elapsed = start.elapsed();
256        self.inner.metrics.latency.new_payload_v3.record(elapsed);
257        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
258        Ok(res?)
259    }
260
261    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
262    pub async fn new_payload_v4(
263        &self,
264        payload: PayloadT::ExecutionData,
265    ) -> EngineApiResult<PayloadStatus> {
266        let payload_or_attrs = PayloadOrAttributes::<
267            '_,
268            PayloadT::ExecutionData,
269            PayloadT::PayloadAttributes,
270        >::from_execution_payload(&payload);
271        self.inner
272            .validator
273            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
274
275        Ok(self
276            .inner
277            .beacon_consensus
278            .new_payload(payload)
279            .await
280            .inspect(|_| self.inner.on_new_payload_response())?)
281    }
282
283    /// Metrics version of `new_payload_v4`
284    pub async fn new_payload_v4_metered(
285        &self,
286        payload: PayloadT::ExecutionData,
287    ) -> RpcResult<PayloadStatus> {
288        let start = Instant::now();
289        let gas_used = payload.gas_used();
290
291        let res = Self::new_payload_v4(self, payload).await;
292
293        let elapsed = start.elapsed();
294        self.inner.metrics.latency.new_payload_v4.record(elapsed);
295        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
296        Ok(res?)
297    }
298}
299
300impl<Provider, EngineT, Pool, Validator, ChainSpec>
301    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
302where
303    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
304    EngineT: EngineTypes,
305    Pool: TransactionPool + 'static,
306    Validator: EngineValidator<EngineT>,
307    ChainSpec: EthereumHardforks + Send + Sync + 'static,
308{
309    /// Sends a message to the beacon consensus engine to update the fork choice _without_
310    /// withdrawals.
311    ///
312    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
313    ///
314    /// Caution: This should not accept the `withdrawals` field
315    pub async fn fork_choice_updated_v1(
316        &self,
317        state: ForkchoiceState,
318        payload_attrs: Option<EngineT::PayloadAttributes>,
319    ) -> EngineApiResult<ForkchoiceUpdated> {
320        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
321            .await
322    }
323
324    /// Metrics version of `fork_choice_updated_v1`
325    pub async fn fork_choice_updated_v1_metered(
326        &self,
327        state: ForkchoiceState,
328        payload_attrs: Option<EngineT::PayloadAttributes>,
329    ) -> EngineApiResult<ForkchoiceUpdated> {
330        let start = Instant::now();
331        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
332        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
333        self.inner.metrics.fcu_response.update_response_metrics(&res);
334        res
335    }
336
337    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
338    /// but only _after_ shanghai.
339    ///
340    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
341    pub async fn fork_choice_updated_v2(
342        &self,
343        state: ForkchoiceState,
344        payload_attrs: Option<EngineT::PayloadAttributes>,
345    ) -> EngineApiResult<ForkchoiceUpdated> {
346        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
347            .await
348    }
349
350    /// Metrics version of `fork_choice_updated_v2`
351    pub async fn fork_choice_updated_v2_metered(
352        &self,
353        state: ForkchoiceState,
354        payload_attrs: Option<EngineT::PayloadAttributes>,
355    ) -> EngineApiResult<ForkchoiceUpdated> {
356        let start = Instant::now();
357        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
358        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
359        self.inner.metrics.fcu_response.update_response_metrics(&res);
360        res
361    }
362
363    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
364    /// but only _after_ cancun.
365    ///
366    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
367    pub async fn fork_choice_updated_v3(
368        &self,
369        state: ForkchoiceState,
370        payload_attrs: Option<EngineT::PayloadAttributes>,
371    ) -> EngineApiResult<ForkchoiceUpdated> {
372        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
373            .await
374    }
375
376    /// Metrics version of `fork_choice_updated_v3`
377    pub async fn fork_choice_updated_v3_metered(
378        &self,
379        state: ForkchoiceState,
380        payload_attrs: Option<EngineT::PayloadAttributes>,
381    ) -> EngineApiResult<ForkchoiceUpdated> {
382        let start = Instant::now();
383        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
384        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
385        self.inner.metrics.fcu_response.update_response_metrics(&res);
386        res
387    }
388
389    /// Returns the most recent version of the payload that is available in the corresponding
390    /// payload build process at the time of receiving this call.
391    ///
392    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
393    ///
394    /// Caution: This should not return the `withdrawals` field
395    ///
396    /// Note:
397    /// > Provider software MAY stop the corresponding build process after serving this call.
398    pub async fn get_payload_v1(
399        &self,
400        payload_id: PayloadId,
401    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
402        self.inner
403            .payload_store
404            .resolve(payload_id)
405            .await
406            .ok_or(EngineApiError::UnknownPayload)?
407            .map_err(|_| EngineApiError::UnknownPayload)?
408            .try_into()
409            .map_err(|_| {
410                warn!("could not transform built payload into ExecutionPayloadV1");
411                EngineApiError::UnknownPayload
412            })
413    }
414
415    /// Metrics version of `get_payload_v1`
416    pub async fn get_payload_v1_metered(
417        &self,
418        payload_id: PayloadId,
419    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
420        let start = Instant::now();
421        let res = Self::get_payload_v1(self, payload_id).await;
422        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
423        res
424    }
425
426    /// Returns the most recent version of the payload that is available in the corresponding
427    /// payload build process at the time of receiving this call.
428    ///
429    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
430    ///
431    /// Note:
432    /// > Provider software MAY stop the corresponding build process after serving this call.
433    pub async fn get_payload_v2(
434        &self,
435        payload_id: PayloadId,
436    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
437        // First we fetch the payload attributes to check the timestamp
438        let attributes = self.get_payload_attributes(payload_id).await?;
439
440        // validate timestamp according to engine rules
441        validate_payload_timestamp(
442            &self.inner.chain_spec,
443            EngineApiMessageVersion::V2,
444            attributes.timestamp(),
445        )?;
446
447        // Now resolve the payload
448        self.inner
449            .payload_store
450            .resolve(payload_id)
451            .await
452            .ok_or(EngineApiError::UnknownPayload)?
453            .map_err(|_| EngineApiError::UnknownPayload)?
454            .try_into()
455            .map_err(|_| {
456                warn!("could not transform built payload into ExecutionPayloadV2");
457                EngineApiError::UnknownPayload
458            })
459    }
460
461    /// Metrics version of `get_payload_v2`
462    pub async fn get_payload_v2_metered(
463        &self,
464        payload_id: PayloadId,
465    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
466        let start = Instant::now();
467        let res = Self::get_payload_v2(self, payload_id).await;
468        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
469        res
470    }
471
472    /// Returns the most recent version of the payload that is available in the corresponding
473    /// payload build process at the time of receiving this call.
474    ///
475    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
476    ///
477    /// Note:
478    /// > Provider software MAY stop the corresponding build process after serving this call.
479    pub async fn get_payload_v3(
480        &self,
481        payload_id: PayloadId,
482    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
483        // First we fetch the payload attributes to check the timestamp
484        let attributes = self.get_payload_attributes(payload_id).await?;
485
486        // validate timestamp according to engine rules
487        validate_payload_timestamp(
488            &self.inner.chain_spec,
489            EngineApiMessageVersion::V3,
490            attributes.timestamp(),
491        )?;
492
493        // Now resolve the payload
494        self.inner
495            .payload_store
496            .resolve(payload_id)
497            .await
498            .ok_or(EngineApiError::UnknownPayload)?
499            .map_err(|_| EngineApiError::UnknownPayload)?
500            .try_into()
501            .map_err(|_| {
502                warn!("could not transform built payload into ExecutionPayloadV3");
503                EngineApiError::UnknownPayload
504            })
505    }
506
507    /// Metrics version of `get_payload_v3`
508    pub async fn get_payload_v3_metered(
509        &self,
510        payload_id: PayloadId,
511    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
512        let start = Instant::now();
513        let res = Self::get_payload_v3(self, payload_id).await;
514        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
515        res
516    }
517
518    /// Returns the most recent version of the payload that is available in the corresponding
519    /// payload build process at the time of receiving this call.
520    ///
521    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
522    ///
523    /// Note:
524    /// > Provider software MAY stop the corresponding build process after serving this call.
525    pub async fn get_payload_v4(
526        &self,
527        payload_id: PayloadId,
528    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
529        // First we fetch the payload attributes to check the timestamp
530        let attributes = self.get_payload_attributes(payload_id).await?;
531
532        // validate timestamp according to engine rules
533        validate_payload_timestamp(
534            &self.inner.chain_spec,
535            EngineApiMessageVersion::V4,
536            attributes.timestamp(),
537        )?;
538
539        // Now resolve the payload
540        self.inner
541            .payload_store
542            .resolve(payload_id)
543            .await
544            .ok_or(EngineApiError::UnknownPayload)?
545            .map_err(|_| EngineApiError::UnknownPayload)?
546            .try_into()
547            .map_err(|_| {
548                warn!("could not transform built payload into ExecutionPayloadV3");
549                EngineApiError::UnknownPayload
550            })
551    }
552
553    /// Metrics version of `get_payload_v4`
554    pub async fn get_payload_v4_metered(
555        &self,
556        payload_id: PayloadId,
557    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
558        let start = Instant::now();
559        let res = Self::get_payload_v4(self, payload_id).await;
560        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
561        res
562    }
563
564    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
565    /// blocks and returns the mapped payload bodies.
566    pub async fn get_payload_bodies_by_range_with<F, R>(
567        &self,
568        start: BlockNumber,
569        count: u64,
570        f: F,
571    ) -> EngineApiResult<Vec<Option<R>>>
572    where
573        F: Fn(Provider::Block) -> R + Send + 'static,
574        R: Send + 'static,
575    {
576        let (tx, rx) = oneshot::channel();
577        let inner = self.inner.clone();
578
579        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
580            if count > MAX_PAYLOAD_BODIES_LIMIT {
581                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
582                return;
583            }
584
585            if start == 0 || count == 0 {
586                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
587                return;
588            }
589
590            let mut result = Vec::with_capacity(count as usize);
591
592            // -1 so range is inclusive
593            let mut end = start.saturating_add(count - 1);
594
595            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
596            // truncate the end if it's greater than the last block
597            if let Ok(best_block) = inner.provider.best_block_number() {
598                if end > best_block {
599                    end = best_block;
600                }
601            }
602
603            for num in start..=end {
604                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
605                match block_result {
606                    Ok(block) => {
607                        result.push(block.map(&f));
608                    }
609                    Err(err) => {
610                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
611                        return;
612                    }
613                };
614            }
615            tx.send(Ok(result)).ok();
616        }));
617
618        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
619    }
620
621    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
622    /// blocks.
623    ///
624    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
625    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
626    /// adversarial.
627    ///
628    /// Implementers should take care when acting on the input to this method, specifically
629    /// ensuring that the range is limited properly, and that the range boundaries are computed
630    /// correctly and without panics.
631    pub async fn get_payload_bodies_by_range_v1(
632        &self,
633        start: BlockNumber,
634        count: u64,
635    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
636        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
637            transactions: block.body().encoded_2718_transactions(),
638            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
639        })
640        .await
641    }
642
643    /// Metrics version of `get_payload_bodies_by_range_v1`
644    pub async fn get_payload_bodies_by_range_v1_metered(
645        &self,
646        start: BlockNumber,
647        count: u64,
648    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
649        let start_time = Instant::now();
650        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
651        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
652        res
653    }
654
655    /// Called to retrieve execution payload bodies by hashes.
656    pub async fn get_payload_bodies_by_hash_with<F, R>(
657        &self,
658        hashes: Vec<BlockHash>,
659        f: F,
660    ) -> EngineApiResult<Vec<Option<R>>>
661    where
662        F: Fn(Provider::Block) -> R + Send + 'static,
663        R: Send + 'static,
664    {
665        let len = hashes.len() as u64;
666        if len > MAX_PAYLOAD_BODIES_LIMIT {
667            return Err(EngineApiError::PayloadRequestTooLarge { len });
668        }
669
670        let (tx, rx) = oneshot::channel();
671        let inner = self.inner.clone();
672
673        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
674            let mut result = Vec::with_capacity(hashes.len());
675            for hash in hashes {
676                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
677                match block_result {
678                    Ok(block) => {
679                        result.push(block.map(&f));
680                    }
681                    Err(err) => {
682                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
683                        return;
684                    }
685                }
686            }
687            tx.send(Ok(result)).ok();
688        }));
689
690        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
691    }
692
693    /// Called to retrieve execution payload bodies by hashes.
694    pub async fn get_payload_bodies_by_hash_v1(
695        &self,
696        hashes: Vec<BlockHash>,
697    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
698        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
699            transactions: block.body().encoded_2718_transactions(),
700            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
701        })
702        .await
703    }
704
705    /// Metrics version of `get_payload_bodies_by_hash_v1`
706    pub async fn get_payload_bodies_by_hash_v1_metered(
707        &self,
708        hashes: Vec<BlockHash>,
709    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
710        let start = Instant::now();
711        let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
712        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
713        res.await
714    }
715
716    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
717    /// update.
718    ///
719    /// The payload attributes will be validated according to the engine API rules for the given
720    /// message version:
721    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
722    ///   validated according to the Paris rules.
723    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
724    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
725    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
726    ///
727    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
728    ///   validated according to the Cancun rules.
729    async fn validate_and_execute_forkchoice(
730        &self,
731        version: EngineApiMessageVersion,
732        state: ForkchoiceState,
733        payload_attrs: Option<EngineT::PayloadAttributes>,
734    ) -> EngineApiResult<ForkchoiceUpdated> {
735        self.inner.record_elapsed_time_on_fcu();
736
737        if let Some(ref attrs) = payload_attrs {
738            let attr_validation_res =
739                self.inner.validator.ensure_well_formed_attributes(version, attrs);
740
741            // From the engine API spec:
742            //
743            // Client software MUST ensure that payloadAttributes.timestamp is greater than
744            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
745            // isn't held client software MUST respond with -38003: Invalid payload attributes and
746            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
747            // update MUST NOT be rolled back.
748            //
749            // NOTE: This will also apply to the validation result for the cancun or
750            // shanghai-specific fields provided in the payload attributes.
751            //
752            // To do this, we set the payload attrs to `None` if attribute validation failed, but
753            // we still apply the forkchoice update.
754            if let Err(err) = attr_validation_res {
755                let fcu_res =
756                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
757                // TODO: decide if we want this branch - the FCU INVALID response might be more
758                // useful than the payload attributes INVALID response
759                if fcu_res.is_invalid() {
760                    return Ok(fcu_res)
761                }
762                return Err(err.into())
763            }
764        }
765
766        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
767    }
768
769    /// Returns reference to supported capabilities.
770    pub fn capabilities(&self) -> &EngineCapabilities {
771        &self.inner.capabilities
772    }
773
774    fn get_blobs_v1(
775        &self,
776        versioned_hashes: Vec<B256>,
777    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
778        if versioned_hashes.len() > MAX_BLOB_LIMIT {
779            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
780        }
781
782        self.inner
783            .tx_pool
784            .get_blobs_for_versioned_hashes(&versioned_hashes)
785            .map_err(|err| EngineApiError::Internal(Box::new(err)))
786    }
787
788    fn get_blobs_v1_metered(
789        &self,
790        versioned_hashes: Vec<B256>,
791    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
792        let hashes_len = versioned_hashes.len();
793        let start = Instant::now();
794        let res = Self::get_blobs_v1(self, versioned_hashes);
795        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
796
797        if let Ok(blobs) = &res {
798            let blobs_found = blobs.iter().flatten().count();
799            let blobs_missed = hashes_len - blobs_found;
800
801            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
802            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
803        }
804
805        res
806    }
807}
808
809impl<Provider, PayloadT, Pool, Validator, ChainSpec>
810    EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
811where
812    PayloadT: PayloadTypes,
813{
814    /// Tracks the elapsed time between the new payload response and the received forkchoice update
815    /// request.
816    fn record_elapsed_time_on_fcu(&self) {
817        if let Some(start_time) = self.latest_new_payload_response.lock().take() {
818            let elapsed_time = start_time.elapsed();
819            self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
820        }
821    }
822
823    /// Updates the timestamp for the latest new payload response.
824    fn on_new_payload_response(&self) {
825        self.latest_new_payload_response.lock().replace(Instant::now());
826    }
827}
828
829// This is the concrete ethereum engine API implementation.
830#[async_trait]
831impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
832    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
833where
834    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
835    EngineT: EngineTypes<ExecutionData = ExecutionData>,
836    Pool: TransactionPool + 'static,
837    Validator: EngineValidator<EngineT>,
838    ChainSpec: EthereumHardforks + Send + Sync + 'static,
839{
840    /// Handler for `engine_newPayloadV1`
841    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
842    /// Caution: This should not accept the `withdrawals` field
843    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
844        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
845        let payload =
846            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
847        Ok(self.new_payload_v1_metered(payload).await?)
848    }
849
850    /// Handler for `engine_newPayloadV2`
851    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
852    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
853        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
854        let payload = ExecutionData {
855            payload: payload.into_payload(),
856            sidecar: ExecutionPayloadSidecar::none(),
857        };
858
859        Ok(self.new_payload_v2_metered(payload).await?)
860    }
861
862    /// Handler for `engine_newPayloadV3`
863    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
864    async fn new_payload_v3(
865        &self,
866        payload: ExecutionPayloadV3,
867        versioned_hashes: Vec<B256>,
868        parent_beacon_block_root: B256,
869    ) -> RpcResult<PayloadStatus> {
870        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
871        let payload = ExecutionData {
872            payload: payload.into(),
873            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
874                versioned_hashes,
875                parent_beacon_block_root,
876            }),
877        };
878
879        Ok(self.new_payload_v3_metered(payload).await?)
880    }
881
882    /// Handler for `engine_newPayloadV4`
883    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
884    async fn new_payload_v4(
885        &self,
886        payload: ExecutionPayloadV3,
887        versioned_hashes: Vec<B256>,
888        parent_beacon_block_root: B256,
889        requests: RequestsOrHash,
890    ) -> RpcResult<PayloadStatus> {
891        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
892
893        // Accept requests as a hash only if it is explicitly allowed
894        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
895            return Err(EngineApiError::UnexpectedRequestsHash.into());
896        }
897
898        let payload = ExecutionData {
899            payload: payload.into(),
900            sidecar: ExecutionPayloadSidecar::v4(
901                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
902                PraguePayloadFields { requests },
903            ),
904        };
905
906        Ok(self.new_payload_v4_metered(payload).await?)
907    }
908
909    /// Handler for `engine_forkchoiceUpdatedV1`
910    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
911    ///
912    /// Caution: This should not accept the `withdrawals` field
913    async fn fork_choice_updated_v1(
914        &self,
915        fork_choice_state: ForkchoiceState,
916        payload_attributes: Option<EngineT::PayloadAttributes>,
917    ) -> RpcResult<ForkchoiceUpdated> {
918        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
919        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
920    }
921
922    /// Handler for `engine_forkchoiceUpdatedV2`
923    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
924    async fn fork_choice_updated_v2(
925        &self,
926        fork_choice_state: ForkchoiceState,
927        payload_attributes: Option<EngineT::PayloadAttributes>,
928    ) -> RpcResult<ForkchoiceUpdated> {
929        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
930        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
931    }
932
933    /// Handler for `engine_forkchoiceUpdatedV2`
934    ///
935    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
936    async fn fork_choice_updated_v3(
937        &self,
938        fork_choice_state: ForkchoiceState,
939        payload_attributes: Option<EngineT::PayloadAttributes>,
940    ) -> RpcResult<ForkchoiceUpdated> {
941        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
942        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
943    }
944
945    /// Handler for `engine_getPayloadV1`
946    ///
947    /// Returns the most recent version of the payload that is available in the corresponding
948    /// payload build process at the time of receiving this call.
949    ///
950    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
951    ///
952    /// Caution: This should not return the `withdrawals` field
953    ///
954    /// Note:
955    /// > Provider software MAY stop the corresponding build process after serving this call.
956    async fn get_payload_v1(
957        &self,
958        payload_id: PayloadId,
959    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
960        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
961        Ok(self.get_payload_v1_metered(payload_id).await?)
962    }
963
964    /// Handler for `engine_getPayloadV2`
965    ///
966    /// Returns the most recent version of the payload that is available in the corresponding
967    /// payload build process at the time of receiving this call.
968    ///
969    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
970    ///
971    /// Note:
972    /// > Provider software MAY stop the corresponding build process after serving this call.
973    async fn get_payload_v2(
974        &self,
975        payload_id: PayloadId,
976    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
977        trace!(target: "rpc::engine", "Serving engine_getPayloadV2");
978        Ok(self.get_payload_v2_metered(payload_id).await?)
979    }
980
981    /// Handler for `engine_getPayloadV3`
982    ///
983    /// Returns the most recent version of the payload that is available in the corresponding
984    /// payload build process at the time of receiving this call.
985    ///
986    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
987    ///
988    /// Note:
989    /// > Provider software MAY stop the corresponding build process after serving this call.
990    async fn get_payload_v3(
991        &self,
992        payload_id: PayloadId,
993    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
994        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
995        Ok(self.get_payload_v3_metered(payload_id).await?)
996    }
997
998    /// Handler for `engine_getPayloadV4`
999    ///
1000    /// Returns the most recent version of the payload that is available in the corresponding
1001    /// payload build process at the time of receiving this call.
1002    ///
1003    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1004    ///
1005    /// Note:
1006    /// > Provider software MAY stop the corresponding build process after serving this call.
1007    async fn get_payload_v4(
1008        &self,
1009        payload_id: PayloadId,
1010    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1011        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1012        Ok(self.get_payload_v4_metered(payload_id).await?)
1013    }
1014
1015    /// Handler for `engine_getPayloadBodiesByHashV1`
1016    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1017    async fn get_payload_bodies_by_hash_v1(
1018        &self,
1019        block_hashes: Vec<BlockHash>,
1020    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1021        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1022        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1023    }
1024
1025    /// Handler for `engine_getPayloadBodiesByRangeV1`
1026    ///
1027    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1028    ///
1029    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1030    /// blocks.
1031    ///
1032    /// WARNING: This method is associated with the BeaconBlocksByRange message in the consensus
1033    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1034    /// adversarial.
1035    ///
1036    /// Implementers should take care when acting on the input to this method, specifically
1037    /// ensuring that the range is limited properly, and that the range boundaries are computed
1038    /// correctly and without panics.
1039    ///
1040    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1041    async fn get_payload_bodies_by_range_v1(
1042        &self,
1043        start: U64,
1044        count: U64,
1045    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1046        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1047        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1048    }
1049
1050    /// Handler for `engine_getClientVersionV1`
1051    ///
1052    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1053    async fn get_client_version_v1(
1054        &self,
1055        client: ClientVersionV1,
1056    ) -> RpcResult<Vec<ClientVersionV1>> {
1057        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1058        Ok(Self::get_client_version_v1(self, client)?)
1059    }
1060
1061    /// Handler for `engine_exchangeCapabilitiesV1`
1062    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1063    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1064        Ok(self.capabilities().list())
1065    }
1066
1067    async fn get_blobs_v1(
1068        &self,
1069        versioned_hashes: Vec<B256>,
1070    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1071        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1072        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1073    }
1074
1075    async fn get_blobs_v2(
1076        &self,
1077        _versioned_hashes: Vec<B256>,
1078    ) -> RpcResult<Vec<Option<BlobAndProofV2>>> {
1079        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1080        Err(internal_rpc_err("unimplemented"))
1081    }
1082}
1083
1084impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1085    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1086where
1087    EngineT: EngineTypes,
1088    Self: EngineApiServer<EngineT>,
1089{
1090    fn into_rpc_module(self) -> RpcModule<()> {
1091        self.into_rpc().remove_context()
1092    }
1093}
1094
1095impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1096    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1097where
1098    PayloadT: PayloadTypes,
1099{
1100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1101        f.debug_struct("EngineApi").finish_non_exhaustive()
1102    }
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107    use super::*;
1108    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1109    use assert_matches::assert_matches;
1110    use reth_chainspec::{ChainSpec, MAINNET};
1111    use reth_engine_primitives::BeaconEngineMessage;
1112    use reth_ethereum_engine_primitives::EthEngineTypes;
1113    use reth_ethereum_primitives::Block;
1114    use reth_node_ethereum::EthereumEngineValidator;
1115    use reth_payload_builder::test_utils::spawn_test_payload_service;
1116    use reth_provider::test_utils::MockEthProvider;
1117    use reth_tasks::TokioTaskExecutor;
1118    use reth_transaction_pool::noop::NoopTransactionPool;
1119    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1120
1121    fn setup_engine_api() -> (
1122        EngineApiTestHandle,
1123        EngineApi<
1124            Arc<MockEthProvider>,
1125            EthEngineTypes,
1126            NoopTransactionPool,
1127            EthereumEngineValidator,
1128            ChainSpec,
1129        >,
1130    ) {
1131        let client = ClientVersionV1 {
1132            code: ClientCode::RH,
1133            name: "Reth".to_string(),
1134            version: "v0.2.0-beta.5".to_string(),
1135            commit: "defa64b2".to_string(),
1136        };
1137
1138        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1139        let provider = Arc::new(MockEthProvider::default());
1140        let payload_store = spawn_test_payload_service();
1141        let (to_engine, engine_rx) = unbounded_channel();
1142        let task_executor = Box::<TokioTaskExecutor>::default();
1143        let api = EngineApi::new(
1144            provider.clone(),
1145            chain_spec.clone(),
1146            BeaconConsensusEngineHandle::new(to_engine),
1147            payload_store.into(),
1148            NoopTransactionPool::default(),
1149            task_executor,
1150            client,
1151            EngineCapabilities::default(),
1152            EthereumEngineValidator::new(chain_spec.clone()),
1153            false,
1154        );
1155        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1156        (handle, api)
1157    }
1158
1159    #[tokio::test]
1160    async fn engine_client_version_v1() {
1161        let client = ClientVersionV1 {
1162            code: ClientCode::RH,
1163            name: "Reth".to_string(),
1164            version: "v0.2.0-beta.5".to_string(),
1165            commit: "defa64b2".to_string(),
1166        };
1167        let (_, api) = setup_engine_api();
1168        let res = api.get_client_version_v1(client.clone());
1169        assert_eq!(res.unwrap(), vec![client]);
1170    }
1171
1172    struct EngineApiTestHandle {
1173        #[allow(dead_code)]
1174        chain_spec: Arc<ChainSpec>,
1175        provider: Arc<MockEthProvider>,
1176        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1177    }
1178
1179    #[tokio::test]
1180    async fn forwards_responses_to_consensus_engine() {
1181        let (mut handle, api) = setup_engine_api();
1182
1183        tokio::spawn(async move {
1184            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1185            let execution_data = ExecutionData {
1186                payload: payload_v1.into(),
1187                sidecar: ExecutionPayloadSidecar::none(),
1188            };
1189
1190            api.new_payload_v1(execution_data).await.unwrap();
1191        });
1192        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1193    }
1194
1195    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1196    mod get_payload_bodies {
1197        use super::*;
1198        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1199        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1200
1201        #[tokio::test]
1202        async fn invalid_params() {
1203            let (_, api) = setup_engine_api();
1204
1205            let by_range_tests = [
1206                // (start, count)
1207                (0, 0),
1208                (0, 1),
1209                (1, 0),
1210            ];
1211
1212            // test [EngineApiMessage::GetPayloadBodiesByRange]
1213            for (start, count) in by_range_tests {
1214                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1215                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1216            }
1217        }
1218
1219        #[tokio::test]
1220        async fn request_too_large() {
1221            let (_, api) = setup_engine_api();
1222
1223            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1224            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1225            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1226        }
1227
1228        #[tokio::test]
1229        async fn returns_payload_bodies() {
1230            let mut rng = generators::rng();
1231            let (handle, api) = setup_engine_api();
1232
1233            let (start, count) = (1, 10);
1234            let blocks = random_block_range(
1235                &mut rng,
1236                start..=start + count - 1,
1237                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1238            );
1239            handle
1240                .provider
1241                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1242
1243            let expected = blocks
1244                .iter()
1245                .cloned()
1246                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1247                .collect::<Vec<_>>();
1248
1249            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1250            assert_eq!(res, expected);
1251        }
1252
1253        #[tokio::test]
1254        async fn returns_payload_bodies_with_gaps() {
1255            let mut rng = generators::rng();
1256            let (handle, api) = setup_engine_api();
1257
1258            let (start, count) = (1, 100);
1259            let blocks = random_block_range(
1260                &mut rng,
1261                start..=start + count - 1,
1262                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1263            );
1264
1265            // Insert only blocks in ranges 1-25 and 50-75
1266            let first_missing_range = 26..=50;
1267            let second_missing_range = 76..=100;
1268            handle.provider.extend_blocks(
1269                blocks
1270                    .iter()
1271                    .filter(|b| {
1272                        !first_missing_range.contains(&b.number) &&
1273                            !second_missing_range.contains(&b.number)
1274                    })
1275                    .map(|b| (b.hash(), b.clone().into_block())),
1276            );
1277
1278            let expected = blocks
1279                .iter()
1280                // filter anything after the second missing range to ensure we don't expect trailing
1281                // `None`s
1282                .filter(|b| !second_missing_range.contains(&b.number))
1283                .cloned()
1284                .map(|b| {
1285                    if first_missing_range.contains(&b.number) {
1286                        None
1287                    } else {
1288                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1289                    }
1290                })
1291                .collect::<Vec<_>>();
1292
1293            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1294            assert_eq!(res, expected);
1295
1296            let expected = blocks
1297                .iter()
1298                .cloned()
1299                // ensure we still return trailing `None`s here because by-hash will not be aware
1300                // of the missing block's number, and cannot compare it to the current best block
1301                .map(|b| {
1302                    if first_missing_range.contains(&b.number) ||
1303                        second_missing_range.contains(&b.number)
1304                    {
1305                        None
1306                    } else {
1307                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1308                    }
1309                })
1310                .collect::<Vec<_>>();
1311
1312            let hashes = blocks.iter().map(|b| b.hash()).collect();
1313            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1314            assert_eq!(res, expected);
1315        }
1316    }
1317}