reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use reth_chainspec::EthereumHardforks;
20use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
21use reth_payload_builder::PayloadStore;
22use reth_payload_primitives::{
23    validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
24    PayloadOrAttributes, PayloadTypes,
25};
26use reth_primitives_traits::{Block, BlockBody};
27use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
28use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
29use reth_tasks::TaskSpawner;
30use reth_transaction_pool::TransactionPool;
31use std::{
32    sync::Arc,
33    time::{Instant, SystemTime},
34};
35use tokio::sync::oneshot;
36use tracing::{debug, trace, warn};
37
38/// The Engine API response sender.
39pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
40
41/// The upper limit for payload bodies request.
42const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
43
44/// The upper limit for blobs in `engine_getBlobsVx`.
45const MAX_BLOB_LIMIT: usize = 128;
46
47/// The Engine API implementation that grants the Consensus layer access to data and
48/// functions in the Execution layer that are crucial for the consensus process.
49///
50/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
51/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
52/// are still follow the engine API model.
53///
54/// ## Implementers
55///
56/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
57/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
58/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
59/// endpoints (e.g. opstack).
60/// See also [`EngineApiServer`] implementation for this type which is the
61/// L1 implementation.
62pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
63    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
64}
65
66impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
67    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
68{
69    /// Returns the configured chainspec.
70    pub fn chain_spec(&self) -> &Arc<ChainSpec> {
71        &self.inner.chain_spec
72    }
73}
74
75impl<Provider, PayloadT, Pool, Validator, ChainSpec>
76    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
77where
78    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
79    PayloadT: PayloadTypes,
80    Pool: TransactionPool + 'static,
81    Validator: EngineApiValidator<PayloadT>,
82    ChainSpec: EthereumHardforks + Send + Sync + 'static,
83{
84    /// Create new instance of [`EngineApi`].
85    #[expect(clippy::too_many_arguments)]
86    pub fn new(
87        provider: Provider,
88        chain_spec: Arc<ChainSpec>,
89        beacon_consensus: ConsensusEngineHandle<PayloadT>,
90        payload_store: PayloadStore<PayloadT>,
91        tx_pool: Pool,
92        task_spawner: Box<dyn TaskSpawner>,
93        client: ClientVersionV1,
94        capabilities: EngineCapabilities,
95        validator: Validator,
96        accept_execution_requests_hash: bool,
97    ) -> Self {
98        let inner = Arc::new(EngineApiInner {
99            provider,
100            chain_spec,
101            beacon_consensus,
102            payload_store,
103            task_spawner,
104            metrics: EngineApiMetrics::default(),
105            client,
106            capabilities,
107            tx_pool,
108            validator,
109            accept_execution_requests_hash,
110        });
111        Self { inner }
112    }
113
114    /// Fetches the client version.
115    pub fn get_client_version_v1(
116        &self,
117        _client: ClientVersionV1,
118    ) -> EngineApiResult<Vec<ClientVersionV1>> {
119        Ok(vec![self.inner.client.clone()])
120    }
121
122    /// Fetches the timestamp of the payload with the given id.
123    async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
124        Ok(self
125            .inner
126            .payload_store
127            .payload_timestamp(payload_id)
128            .await
129            .ok_or(EngineApiError::UnknownPayload)??)
130    }
131
132    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
133    /// Caution: This should not accept the `withdrawals` field
134    pub async fn new_payload_v1(
135        &self,
136        payload: PayloadT::ExecutionData,
137    ) -> EngineApiResult<PayloadStatus> {
138        let payload_or_attrs = PayloadOrAttributes::<
139            '_,
140            PayloadT::ExecutionData,
141            PayloadT::PayloadAttributes,
142        >::from_execution_payload(&payload);
143
144        self.inner
145            .validator
146            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
147
148        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
149    }
150
151    /// Metered version of `new_payload_v1`.
152    pub async fn new_payload_v1_metered(
153        &self,
154        payload: PayloadT::ExecutionData,
155    ) -> EngineApiResult<PayloadStatus> {
156        let start = Instant::now();
157        let res = Self::new_payload_v1(self, payload).await;
158        let elapsed = start.elapsed();
159        self.inner.metrics.latency.new_payload_v1.record(elapsed);
160        res
161    }
162
163    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
164    pub async fn new_payload_v2(
165        &self,
166        payload: PayloadT::ExecutionData,
167    ) -> EngineApiResult<PayloadStatus> {
168        let payload_or_attrs = PayloadOrAttributes::<
169            '_,
170            PayloadT::ExecutionData,
171            PayloadT::PayloadAttributes,
172        >::from_execution_payload(&payload);
173        self.inner
174            .validator
175            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
176        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
177    }
178
179    /// Metered version of `new_payload_v2`.
180    pub async fn new_payload_v2_metered(
181        &self,
182        payload: PayloadT::ExecutionData,
183    ) -> EngineApiResult<PayloadStatus> {
184        let start = Instant::now();
185        let res = Self::new_payload_v2(self, payload).await;
186        let elapsed = start.elapsed();
187        self.inner.metrics.latency.new_payload_v2.record(elapsed);
188        res
189    }
190
191    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
192    pub async fn new_payload_v3(
193        &self,
194        payload: PayloadT::ExecutionData,
195    ) -> EngineApiResult<PayloadStatus> {
196        let payload_or_attrs = PayloadOrAttributes::<
197            '_,
198            PayloadT::ExecutionData,
199            PayloadT::PayloadAttributes,
200        >::from_execution_payload(&payload);
201        self.inner
202            .validator
203            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
204
205        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
206    }
207
208    /// Metrics version of `new_payload_v3`
209    pub async fn new_payload_v3_metered(
210        &self,
211        payload: PayloadT::ExecutionData,
212    ) -> RpcResult<PayloadStatus> {
213        let start = Instant::now();
214
215        let res = Self::new_payload_v3(self, payload).await;
216        let elapsed = start.elapsed();
217        self.inner.metrics.latency.new_payload_v3.record(elapsed);
218        Ok(res?)
219    }
220
221    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
222    pub async fn new_payload_v4(
223        &self,
224        payload: PayloadT::ExecutionData,
225    ) -> EngineApiResult<PayloadStatus> {
226        let payload_or_attrs = PayloadOrAttributes::<
227            '_,
228            PayloadT::ExecutionData,
229            PayloadT::PayloadAttributes,
230        >::from_execution_payload(&payload);
231        self.inner
232            .validator
233            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
234
235        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
236    }
237
238    /// Metrics version of `new_payload_v4`
239    pub async fn new_payload_v4_metered(
240        &self,
241        payload: PayloadT::ExecutionData,
242    ) -> RpcResult<PayloadStatus> {
243        let start = Instant::now();
244        let res = Self::new_payload_v4(self, payload).await;
245
246        let elapsed = start.elapsed();
247        self.inner.metrics.latency.new_payload_v4.record(elapsed);
248        Ok(res?)
249    }
250
251    /// Returns whether the engine accepts execution requests hash.
252    pub fn accept_execution_requests_hash(&self) -> bool {
253        self.inner.accept_execution_requests_hash
254    }
255}
256
257impl<Provider, EngineT, Pool, Validator, ChainSpec>
258    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
259where
260    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
261    EngineT: EngineTypes,
262    Pool: TransactionPool + 'static,
263    Validator: EngineApiValidator<EngineT>,
264    ChainSpec: EthereumHardforks + Send + Sync + 'static,
265{
266    /// Sends a message to the beacon consensus engine to update the fork choice _without_
267    /// withdrawals.
268    ///
269    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
270    ///
271    /// Caution: This should not accept the `withdrawals` field
272    pub async fn fork_choice_updated_v1(
273        &self,
274        state: ForkchoiceState,
275        payload_attrs: Option<EngineT::PayloadAttributes>,
276    ) -> EngineApiResult<ForkchoiceUpdated> {
277        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
278            .await
279    }
280
281    /// Metrics version of `fork_choice_updated_v1`
282    pub async fn fork_choice_updated_v1_metered(
283        &self,
284        state: ForkchoiceState,
285        payload_attrs: Option<EngineT::PayloadAttributes>,
286    ) -> EngineApiResult<ForkchoiceUpdated> {
287        let start = Instant::now();
288        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
289        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
290        res
291    }
292
293    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
294    /// but only _after_ shanghai.
295    ///
296    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
297    pub async fn fork_choice_updated_v2(
298        &self,
299        state: ForkchoiceState,
300        payload_attrs: Option<EngineT::PayloadAttributes>,
301    ) -> EngineApiResult<ForkchoiceUpdated> {
302        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
303            .await
304    }
305
306    /// Metrics version of `fork_choice_updated_v2`
307    pub async fn fork_choice_updated_v2_metered(
308        &self,
309        state: ForkchoiceState,
310        payload_attrs: Option<EngineT::PayloadAttributes>,
311    ) -> EngineApiResult<ForkchoiceUpdated> {
312        let start = Instant::now();
313        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
314        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
315        res
316    }
317
318    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
319    /// but only _after_ cancun.
320    ///
321    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
322    pub async fn fork_choice_updated_v3(
323        &self,
324        state: ForkchoiceState,
325        payload_attrs: Option<EngineT::PayloadAttributes>,
326    ) -> EngineApiResult<ForkchoiceUpdated> {
327        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
328            .await
329    }
330
331    /// Metrics version of `fork_choice_updated_v3`
332    pub async fn fork_choice_updated_v3_metered(
333        &self,
334        state: ForkchoiceState,
335        payload_attrs: Option<EngineT::PayloadAttributes>,
336    ) -> EngineApiResult<ForkchoiceUpdated> {
337        let start = Instant::now();
338        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
339        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
340        res
341    }
342
343    /// Helper function for retrieving the build payload by id.
344    async fn get_built_payload(
345        &self,
346        payload_id: PayloadId,
347    ) -> EngineApiResult<EngineT::BuiltPayload> {
348        self.inner
349            .payload_store
350            .resolve(payload_id)
351            .await
352            .ok_or(EngineApiError::UnknownPayload)?
353            .map_err(|_| EngineApiError::UnknownPayload)
354    }
355
356    /// Helper function for validating the payload timestamp and retrieving & converting the payload
357    /// into desired envelope.
358    async fn get_payload_inner<R>(
359        &self,
360        payload_id: PayloadId,
361        version: EngineApiMessageVersion,
362    ) -> EngineApiResult<R>
363    where
364        EngineT::BuiltPayload: TryInto<R>,
365    {
366        // Validate timestamp according to engine rules
367        // Enforces Osaka restrictions on `getPayloadV4`.
368        let timestamp = self.get_payload_timestamp(payload_id).await?;
369        validate_payload_timestamp(
370            &self.inner.chain_spec,
371            version,
372            timestamp,
373            MessageValidationKind::GetPayload,
374        )?;
375
376        // Now resolve the payload
377        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
378            warn!(?version, "could not transform built payload");
379            EngineApiError::UnknownPayload
380        })
381    }
382
383    /// Returns the most recent version of the payload that is available in the corresponding
384    /// payload build process at the time of receiving this call.
385    ///
386    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
387    ///
388    /// Caution: This should not return the `withdrawals` field
389    ///
390    /// Note:
391    /// > Provider software MAY stop the corresponding build process after serving this call.
392    pub async fn get_payload_v1(
393        &self,
394        payload_id: PayloadId,
395    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
396        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
397            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
398            EngineApiError::UnknownPayload
399        })
400    }
401
402    /// Metrics version of `get_payload_v1`
403    pub async fn get_payload_v1_metered(
404        &self,
405        payload_id: PayloadId,
406    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
407        let start = Instant::now();
408        let res = Self::get_payload_v1(self, payload_id).await;
409        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
410        res
411    }
412
413    /// Returns the most recent version of the payload that is available in the corresponding
414    /// payload build process at the time of receiving this call.
415    ///
416    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
417    ///
418    /// Note:
419    /// > Provider software MAY stop the corresponding build process after serving this call.
420    pub async fn get_payload_v2(
421        &self,
422        payload_id: PayloadId,
423    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
424        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
425    }
426
427    /// Metrics version of `get_payload_v2`
428    pub async fn get_payload_v2_metered(
429        &self,
430        payload_id: PayloadId,
431    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
432        let start = Instant::now();
433        let res = Self::get_payload_v2(self, payload_id).await;
434        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
435        res
436    }
437
438    /// Returns the most recent version of the payload that is available in the corresponding
439    /// payload build process at the time of receiving this call.
440    ///
441    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
442    ///
443    /// Note:
444    /// > Provider software MAY stop the corresponding build process after serving this call.
445    pub async fn get_payload_v3(
446        &self,
447        payload_id: PayloadId,
448    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
449        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
450    }
451
452    /// Metrics version of `get_payload_v3`
453    pub async fn get_payload_v3_metered(
454        &self,
455        payload_id: PayloadId,
456    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
457        let start = Instant::now();
458        let res = Self::get_payload_v3(self, payload_id).await;
459        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
460        res
461    }
462
463    /// Returns the most recent version of the payload that is available in the corresponding
464    /// payload build process at the time of receiving this call.
465    ///
466    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
467    ///
468    /// Note:
469    /// > Provider software MAY stop the corresponding build process after serving this call.
470    pub async fn get_payload_v4(
471        &self,
472        payload_id: PayloadId,
473    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
474        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
475    }
476
477    /// Metrics version of `get_payload_v4`
478    pub async fn get_payload_v4_metered(
479        &self,
480        payload_id: PayloadId,
481    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
482        let start = Instant::now();
483        let res = Self::get_payload_v4(self, payload_id).await;
484        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
485        res
486    }
487
488    /// Handler for `engine_getPayloadV5`
489    ///
490    /// Returns the most recent version of the payload that is available in the corresponding
491    /// payload build process at the time of receiving this call.
492    ///
493    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
494    ///
495    /// Note:
496    /// > Provider software MAY stop the corresponding build process after serving this call.
497    pub async fn get_payload_v5(
498        &self,
499        payload_id: PayloadId,
500    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
501        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
502    }
503
504    /// Metrics version of `get_payload_v5`
505    pub async fn get_payload_v5_metered(
506        &self,
507        payload_id: PayloadId,
508    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
509        let start = Instant::now();
510        let res = Self::get_payload_v5(self, payload_id).await;
511        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
512        res
513    }
514
515    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
516    /// blocks and returns the mapped payload bodies.
517    pub async fn get_payload_bodies_by_range_with<F, R>(
518        &self,
519        start: BlockNumber,
520        count: u64,
521        f: F,
522    ) -> EngineApiResult<Vec<Option<R>>>
523    where
524        F: Fn(Provider::Block) -> R + Send + 'static,
525        R: Send + 'static,
526    {
527        let (tx, rx) = oneshot::channel();
528        let inner = self.inner.clone();
529
530        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
531            if count > MAX_PAYLOAD_BODIES_LIMIT {
532                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
533                return;
534            }
535
536            if start == 0 || count == 0 {
537                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
538                return;
539            }
540
541            let mut result = Vec::with_capacity(count as usize);
542
543            // -1 so range is inclusive
544            let mut end = start.saturating_add(count - 1);
545
546            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
547            // truncate the end if it's greater than the last block
548            if let Ok(best_block) = inner.provider.best_block_number()
549                && end > best_block {
550                    end = best_block;
551                }
552
553            // Check if the requested range starts before the earliest available block due to pruning/expiry
554            let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
555            for num in start..=end {
556                if num < earliest_block {
557                    result.push(None);
558                    continue;
559                }
560                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
561                match block_result {
562                    Ok(block) => {
563                        result.push(block.map(&f));
564                    }
565                    Err(err) => {
566                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
567                        return;
568                    }
569                };
570            }
571            tx.send(Ok(result)).ok();
572        }));
573
574        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
575    }
576
577    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
578    /// blocks.
579    ///
580    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
581    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
582    /// adversarial.
583    ///
584    /// Implementers should take care when acting on the input to this method, specifically
585    /// ensuring that the range is limited properly, and that the range boundaries are computed
586    /// correctly and without panics.
587    pub async fn get_payload_bodies_by_range_v1(
588        &self,
589        start: BlockNumber,
590        count: u64,
591    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
592        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
593            transactions: block.body().encoded_2718_transactions(),
594            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
595        })
596        .await
597    }
598
599    /// Metrics version of `get_payload_bodies_by_range_v1`
600    pub async fn get_payload_bodies_by_range_v1_metered(
601        &self,
602        start: BlockNumber,
603        count: u64,
604    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
605        let start_time = Instant::now();
606        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
607        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
608        res
609    }
610
611    /// Called to retrieve execution payload bodies by hashes.
612    pub async fn get_payload_bodies_by_hash_with<F, R>(
613        &self,
614        hashes: Vec<BlockHash>,
615        f: F,
616    ) -> EngineApiResult<Vec<Option<R>>>
617    where
618        F: Fn(Provider::Block) -> R + Send + 'static,
619        R: Send + 'static,
620    {
621        let len = hashes.len() as u64;
622        if len > MAX_PAYLOAD_BODIES_LIMIT {
623            return Err(EngineApiError::PayloadRequestTooLarge { len });
624        }
625
626        let (tx, rx) = oneshot::channel();
627        let inner = self.inner.clone();
628
629        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
630            let mut result = Vec::with_capacity(hashes.len());
631            for hash in hashes {
632                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
633                match block_result {
634                    Ok(block) => {
635                        result.push(block.map(&f));
636                    }
637                    Err(err) => {
638                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
639                        return;
640                    }
641                }
642            }
643            tx.send(Ok(result)).ok();
644        }));
645
646        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
647    }
648
649    /// Called to retrieve execution payload bodies by hashes.
650    pub async fn get_payload_bodies_by_hash_v1(
651        &self,
652        hashes: Vec<BlockHash>,
653    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
654        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
655            transactions: block.body().encoded_2718_transactions(),
656            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
657        })
658        .await
659    }
660
661    /// Metrics version of `get_payload_bodies_by_hash_v1`
662    pub async fn get_payload_bodies_by_hash_v1_metered(
663        &self,
664        hashes: Vec<BlockHash>,
665    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
666        let start = Instant::now();
667        let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
668        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
669        res
670    }
671
672    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
673    /// update.
674    ///
675    /// The payload attributes will be validated according to the engine API rules for the given
676    /// message version:
677    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
678    ///   validated according to the Paris rules.
679    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
680    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
681    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
682    ///
683    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
684    ///   validated according to the Cancun rules.
685    async fn validate_and_execute_forkchoice(
686        &self,
687        version: EngineApiMessageVersion,
688        state: ForkchoiceState,
689        payload_attrs: Option<EngineT::PayloadAttributes>,
690    ) -> EngineApiResult<ForkchoiceUpdated> {
691        if let Some(ref attrs) = payload_attrs {
692            let attr_validation_res =
693                self.inner.validator.ensure_well_formed_attributes(version, attrs);
694
695            // From the engine API spec:
696            //
697            // Client software MUST ensure that payloadAttributes.timestamp is greater than
698            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
699            // isn't held client software MUST respond with -38003: Invalid payload attributes and
700            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
701            // update MUST NOT be rolled back.
702            //
703            // NOTE: This will also apply to the validation result for the cancun or
704            // shanghai-specific fields provided in the payload attributes.
705            //
706            // To do this, we set the payload attrs to `None` if attribute validation failed, but
707            // we still apply the forkchoice update.
708            if let Err(err) = attr_validation_res {
709                let fcu_res =
710                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
711                // TODO: decide if we want this branch - the FCU INVALID response might be more
712                // useful than the payload attributes INVALID response
713                if fcu_res.is_invalid() {
714                    return Ok(fcu_res)
715                }
716                return Err(err.into())
717            }
718        }
719
720        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
721    }
722
723    /// Returns reference to supported capabilities.
724    pub fn capabilities(&self) -> &EngineCapabilities {
725        &self.inner.capabilities
726    }
727
728    fn get_blobs_v1(
729        &self,
730        versioned_hashes: Vec<B256>,
731    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
732        // Only allow this method before Osaka fork
733        let current_timestamp =
734            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
735        if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
736            return Err(EngineApiError::EngineObjectValidationError(
737                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
738            ));
739        }
740
741        if versioned_hashes.len() > MAX_BLOB_LIMIT {
742            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
743        }
744
745        self.inner
746            .tx_pool
747            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
748            .map_err(|err| EngineApiError::Internal(Box::new(err)))
749    }
750
751    /// Metered version of `get_blobs_v1`.
752    pub fn get_blobs_v1_metered(
753        &self,
754        versioned_hashes: Vec<B256>,
755    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
756        let hashes_len = versioned_hashes.len();
757        let start = Instant::now();
758        let res = Self::get_blobs_v1(self, versioned_hashes);
759        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
760
761        if let Ok(blobs) = &res {
762            let blobs_found = blobs.iter().flatten().count();
763            let blobs_missed = hashes_len - blobs_found;
764
765            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
766            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
767        }
768
769        res
770    }
771
772    fn get_blobs_v2(
773        &self,
774        versioned_hashes: Vec<B256>,
775    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
776        // Check if Osaka fork is active
777        let current_timestamp =
778            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
779        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
780            return Err(EngineApiError::EngineObjectValidationError(
781                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
782            ));
783        }
784
785        if versioned_hashes.len() > MAX_BLOB_LIMIT {
786            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
787        }
788
789        self.inner
790            .tx_pool
791            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
792            .map_err(|err| EngineApiError::Internal(Box::new(err)))
793    }
794
795    /// Metered version of `get_blobs_v2`.
796    pub fn get_blobs_v2_metered(
797        &self,
798        versioned_hashes: Vec<B256>,
799    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
800        let hashes_len = versioned_hashes.len();
801        let start = Instant::now();
802        let res = Self::get_blobs_v2(self, versioned_hashes);
803        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
804
805        if let Ok(blobs) = &res {
806            let blobs_found = blobs.iter().flatten().count();
807
808            self.inner
809                .metrics
810                .blob_metrics
811                .get_blobs_requests_blobs_total
812                .increment(hashes_len as u64);
813            self.inner
814                .metrics
815                .blob_metrics
816                .get_blobs_requests_blobs_in_blobpool_total
817                .increment(blobs_found as u64);
818
819            if blobs_found == hashes_len {
820                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
821            } else {
822                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
823            }
824        } else {
825            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
826        }
827
828        res
829    }
830}
831
832// This is the concrete ethereum engine API implementation.
833#[async_trait]
834impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
835    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
836where
837    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
838    EngineT: EngineTypes<ExecutionData = ExecutionData>,
839    Pool: TransactionPool + 'static,
840    Validator: EngineApiValidator<EngineT>,
841    ChainSpec: EthereumHardforks + Send + Sync + 'static,
842{
843    /// Handler for `engine_newPayloadV1`
844    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
845    /// Caution: This should not accept the `withdrawals` field
846    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
847        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
848        let payload =
849            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
850        Ok(self.new_payload_v1_metered(payload).await?)
851    }
852
853    /// Handler for `engine_newPayloadV2`
854    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
855    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
856        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
857        let payload = ExecutionData {
858            payload: payload.into_payload(),
859            sidecar: ExecutionPayloadSidecar::none(),
860        };
861
862        Ok(self.new_payload_v2_metered(payload).await?)
863    }
864
865    /// Handler for `engine_newPayloadV3`
866    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
867    async fn new_payload_v3(
868        &self,
869        payload: ExecutionPayloadV3,
870        versioned_hashes: Vec<B256>,
871        parent_beacon_block_root: B256,
872    ) -> RpcResult<PayloadStatus> {
873        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
874        let payload = ExecutionData {
875            payload: payload.into(),
876            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
877                versioned_hashes,
878                parent_beacon_block_root,
879            }),
880        };
881
882        Ok(self.new_payload_v3_metered(payload).await?)
883    }
884
885    /// Handler for `engine_newPayloadV4`
886    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
887    async fn new_payload_v4(
888        &self,
889        payload: ExecutionPayloadV3,
890        versioned_hashes: Vec<B256>,
891        parent_beacon_block_root: B256,
892        requests: RequestsOrHash,
893    ) -> RpcResult<PayloadStatus> {
894        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
895
896        // Accept requests as a hash only if it is explicitly allowed
897        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
898            return Err(EngineApiError::UnexpectedRequestsHash.into());
899        }
900
901        let payload = ExecutionData {
902            payload: payload.into(),
903            sidecar: ExecutionPayloadSidecar::v4(
904                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
905                PraguePayloadFields { requests },
906            ),
907        };
908
909        Ok(self.new_payload_v4_metered(payload).await?)
910    }
911
912    /// Handler for `engine_forkchoiceUpdatedV1`
913    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
914    ///
915    /// Caution: This should not accept the `withdrawals` field
916    async fn fork_choice_updated_v1(
917        &self,
918        fork_choice_state: ForkchoiceState,
919        payload_attributes: Option<EngineT::PayloadAttributes>,
920    ) -> RpcResult<ForkchoiceUpdated> {
921        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
922        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
923    }
924
925    /// Handler for `engine_forkchoiceUpdatedV2`
926    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
927    async fn fork_choice_updated_v2(
928        &self,
929        fork_choice_state: ForkchoiceState,
930        payload_attributes: Option<EngineT::PayloadAttributes>,
931    ) -> RpcResult<ForkchoiceUpdated> {
932        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
933        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
934    }
935
936    /// Handler for `engine_forkchoiceUpdatedV2`
937    ///
938    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
939    async fn fork_choice_updated_v3(
940        &self,
941        fork_choice_state: ForkchoiceState,
942        payload_attributes: Option<EngineT::PayloadAttributes>,
943    ) -> RpcResult<ForkchoiceUpdated> {
944        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
945        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
946    }
947
948    /// Handler for `engine_getPayloadV1`
949    ///
950    /// Returns the most recent version of the payload that is available in the corresponding
951    /// payload build process at the time of receiving this call.
952    ///
953    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
954    ///
955    /// Caution: This should not return the `withdrawals` field
956    ///
957    /// Note:
958    /// > Provider software MAY stop the corresponding build process after serving this call.
959    async fn get_payload_v1(
960        &self,
961        payload_id: PayloadId,
962    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
963        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
964        Ok(self.get_payload_v1_metered(payload_id).await?)
965    }
966
967    /// Handler for `engine_getPayloadV2`
968    ///
969    /// Returns the most recent version of the payload that is available in the corresponding
970    /// payload build process at the time of receiving this call.
971    ///
972    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
973    ///
974    /// Note:
975    /// > Provider software MAY stop the corresponding build process after serving this call.
976    async fn get_payload_v2(
977        &self,
978        payload_id: PayloadId,
979    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
980        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
981        Ok(self.get_payload_v2_metered(payload_id).await?)
982    }
983
984    /// Handler for `engine_getPayloadV3`
985    ///
986    /// Returns the most recent version of the payload that is available in the corresponding
987    /// payload build process at the time of receiving this call.
988    ///
989    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
990    ///
991    /// Note:
992    /// > Provider software MAY stop the corresponding build process after serving this call.
993    async fn get_payload_v3(
994        &self,
995        payload_id: PayloadId,
996    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
997        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
998        Ok(self.get_payload_v3_metered(payload_id).await?)
999    }
1000
1001    /// Handler for `engine_getPayloadV4`
1002    ///
1003    /// Returns the most recent version of the payload that is available in the corresponding
1004    /// payload build process at the time of receiving this call.
1005    ///
1006    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1007    ///
1008    /// Note:
1009    /// > Provider software MAY stop the corresponding build process after serving this call.
1010    async fn get_payload_v4(
1011        &self,
1012        payload_id: PayloadId,
1013    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1014        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1015        Ok(self.get_payload_v4_metered(payload_id).await?)
1016    }
1017
1018    /// Handler for `engine_getPayloadV5`
1019    ///
1020    /// Returns the most recent version of the payload that is available in the corresponding
1021    /// payload build process at the time of receiving this call.
1022    ///
1023    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1024    ///
1025    /// Note:
1026    /// > Provider software MAY stop the corresponding build process after serving this call.
1027    async fn get_payload_v5(
1028        &self,
1029        payload_id: PayloadId,
1030    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1031        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1032        Ok(self.get_payload_v5_metered(payload_id).await?)
1033    }
1034
1035    /// Handler for `engine_getPayloadBodiesByHashV1`
1036    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1037    async fn get_payload_bodies_by_hash_v1(
1038        &self,
1039        block_hashes: Vec<BlockHash>,
1040    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1041        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1042        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1043    }
1044
1045    /// Handler for `engine_getPayloadBodiesByRangeV1`
1046    ///
1047    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1048    ///
1049    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1050    /// blocks.
1051    ///
1052    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1053    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1054    /// adversarial.
1055    ///
1056    /// Implementers should take care when acting on the input to this method, specifically
1057    /// ensuring that the range is limited properly, and that the range boundaries are computed
1058    /// correctly and without panics.
1059    ///
1060    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1061    async fn get_payload_bodies_by_range_v1(
1062        &self,
1063        start: U64,
1064        count: U64,
1065    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1066        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1067        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1068    }
1069
1070    /// Handler for `engine_getClientVersionV1`
1071    ///
1072    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1073    async fn get_client_version_v1(
1074        &self,
1075        client: ClientVersionV1,
1076    ) -> RpcResult<Vec<ClientVersionV1>> {
1077        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1078        Ok(Self::get_client_version_v1(self, client)?)
1079    }
1080
1081    /// Handler for `engine_exchangeCapabilitiesV1`
1082    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1083    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1084        Ok(self.capabilities().list())
1085    }
1086
1087    async fn get_blobs_v1(
1088        &self,
1089        versioned_hashes: Vec<B256>,
1090    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1091        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1092        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1093    }
1094
1095    async fn get_blobs_v2(
1096        &self,
1097        versioned_hashes: Vec<B256>,
1098    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1099        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1100        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1101    }
1102}
1103
1104impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1105    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1106where
1107    EngineT: EngineTypes,
1108    Self: EngineApiServer<EngineT>,
1109{
1110    fn into_rpc_module(self) -> RpcModule<()> {
1111        self.into_rpc().remove_context()
1112    }
1113}
1114
1115impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1116    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1117where
1118    PayloadT: PayloadTypes,
1119{
1120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1121        f.debug_struct("EngineApi").finish_non_exhaustive()
1122    }
1123}
1124
1125impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1126    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1127where
1128    PayloadT: PayloadTypes,
1129{
1130    fn clone(&self) -> Self {
1131        Self { inner: Arc::clone(&self.inner) }
1132    }
1133}
1134
1135/// The container type for the engine API internals.
1136struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1137    /// The provider to interact with the chain.
1138    provider: Provider,
1139    /// Consensus configuration
1140    chain_spec: Arc<ChainSpec>,
1141    /// The channel to send messages to the beacon consensus engine.
1142    beacon_consensus: ConsensusEngineHandle<PayloadT>,
1143    /// The type that can communicate with the payload service to retrieve payloads.
1144    payload_store: PayloadStore<PayloadT>,
1145    /// For spawning and executing async tasks
1146    task_spawner: Box<dyn TaskSpawner>,
1147    /// The latency and response type metrics for engine api calls
1148    metrics: EngineApiMetrics,
1149    /// Identification of the execution client used by the consensus client
1150    client: ClientVersionV1,
1151    /// The list of all supported Engine capabilities available over the engine endpoint.
1152    capabilities: EngineCapabilities,
1153    /// Transaction pool.
1154    tx_pool: Pool,
1155    /// Engine validator.
1156    validator: Validator,
1157    accept_execution_requests_hash: bool,
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use super::*;
1163    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1164    use assert_matches::assert_matches;
1165    use reth_chainspec::{ChainSpec, MAINNET};
1166    use reth_engine_primitives::BeaconEngineMessage;
1167    use reth_ethereum_engine_primitives::EthEngineTypes;
1168    use reth_ethereum_primitives::Block;
1169    use reth_node_ethereum::EthereumEngineValidator;
1170    use reth_payload_builder::test_utils::spawn_test_payload_service;
1171    use reth_provider::test_utils::MockEthProvider;
1172    use reth_tasks::TokioTaskExecutor;
1173    use reth_transaction_pool::noop::NoopTransactionPool;
1174    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1175
1176    fn setup_engine_api() -> (
1177        EngineApiTestHandle,
1178        EngineApi<
1179            Arc<MockEthProvider>,
1180            EthEngineTypes,
1181            NoopTransactionPool,
1182            EthereumEngineValidator,
1183            ChainSpec,
1184        >,
1185    ) {
1186        let client = ClientVersionV1 {
1187            code: ClientCode::RH,
1188            name: "Reth".to_string(),
1189            version: "v0.2.0-beta.5".to_string(),
1190            commit: "defa64b2".to_string(),
1191        };
1192
1193        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1194        let provider = Arc::new(MockEthProvider::default());
1195        let payload_store = spawn_test_payload_service();
1196        let (to_engine, engine_rx) = unbounded_channel();
1197        let task_executor = Box::<TokioTaskExecutor>::default();
1198        let api = EngineApi::new(
1199            provider.clone(),
1200            chain_spec.clone(),
1201            ConsensusEngineHandle::new(to_engine),
1202            payload_store.into(),
1203            NoopTransactionPool::default(),
1204            task_executor,
1205            client,
1206            EngineCapabilities::default(),
1207            EthereumEngineValidator::new(chain_spec.clone()),
1208            false,
1209        );
1210        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1211        (handle, api)
1212    }
1213
1214    #[tokio::test]
1215    async fn engine_client_version_v1() {
1216        let client = ClientVersionV1 {
1217            code: ClientCode::RH,
1218            name: "Reth".to_string(),
1219            version: "v0.2.0-beta.5".to_string(),
1220            commit: "defa64b2".to_string(),
1221        };
1222        let (_, api) = setup_engine_api();
1223        let res = api.get_client_version_v1(client.clone());
1224        assert_eq!(res.unwrap(), vec![client]);
1225    }
1226
1227    struct EngineApiTestHandle {
1228        #[allow(dead_code)]
1229        chain_spec: Arc<ChainSpec>,
1230        provider: Arc<MockEthProvider>,
1231        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1232    }
1233
1234    #[tokio::test]
1235    async fn forwards_responses_to_consensus_engine() {
1236        let (mut handle, api) = setup_engine_api();
1237
1238        tokio::spawn(async move {
1239            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1240            let execution_data = ExecutionData {
1241                payload: payload_v1.into(),
1242                sidecar: ExecutionPayloadSidecar::none(),
1243            };
1244
1245            api.new_payload_v1(execution_data).await.unwrap();
1246        });
1247        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1248    }
1249
1250    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1251    mod get_payload_bodies {
1252        use super::*;
1253        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1254        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1255
1256        #[tokio::test]
1257        async fn invalid_params() {
1258            let (_, api) = setup_engine_api();
1259
1260            let by_range_tests = [
1261                // (start, count)
1262                (0, 0),
1263                (0, 1),
1264                (1, 0),
1265            ];
1266
1267            // test [EngineApiMessage::GetPayloadBodiesByRange]
1268            for (start, count) in by_range_tests {
1269                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1270                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1271            }
1272        }
1273
1274        #[tokio::test]
1275        async fn request_too_large() {
1276            let (_, api) = setup_engine_api();
1277
1278            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1279            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1280            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1281        }
1282
1283        #[tokio::test]
1284        async fn returns_payload_bodies() {
1285            let mut rng = generators::rng();
1286            let (handle, api) = setup_engine_api();
1287
1288            let (start, count) = (1, 10);
1289            let blocks = random_block_range(
1290                &mut rng,
1291                start..=start + count - 1,
1292                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1293            );
1294            handle
1295                .provider
1296                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1297
1298            let expected = blocks
1299                .iter()
1300                .cloned()
1301                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1302                .collect::<Vec<_>>();
1303
1304            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1305            assert_eq!(res, expected);
1306        }
1307
1308        #[tokio::test]
1309        async fn returns_payload_bodies_with_gaps() {
1310            let mut rng = generators::rng();
1311            let (handle, api) = setup_engine_api();
1312
1313            let (start, count) = (1, 100);
1314            let blocks = random_block_range(
1315                &mut rng,
1316                start..=start + count - 1,
1317                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1318            );
1319
1320            // Insert only blocks in ranges 1-25 and 50-75
1321            let first_missing_range = 26..=50;
1322            let second_missing_range = 76..=100;
1323            handle.provider.extend_blocks(
1324                blocks
1325                    .iter()
1326                    .filter(|b| {
1327                        !first_missing_range.contains(&b.number) &&
1328                            !second_missing_range.contains(&b.number)
1329                    })
1330                    .map(|b| (b.hash(), b.clone().into_block())),
1331            );
1332
1333            let expected = blocks
1334                .iter()
1335                // filter anything after the second missing range to ensure we don't expect trailing
1336                // `None`s
1337                .filter(|b| !second_missing_range.contains(&b.number))
1338                .cloned()
1339                .map(|b| {
1340                    if first_missing_range.contains(&b.number) {
1341                        None
1342                    } else {
1343                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1344                    }
1345                })
1346                .collect::<Vec<_>>();
1347
1348            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1349            assert_eq!(res, expected);
1350
1351            let expected = blocks
1352                .iter()
1353                .cloned()
1354                // ensure we still return trailing `None`s here because by-hash will not be aware
1355                // of the missing block's number, and cannot compare it to the current best block
1356                .map(|b| {
1357                    if first_missing_range.contains(&b.number) ||
1358                        second_missing_range.contains(&b.number)
1359                    {
1360                        None
1361                    } else {
1362                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1363                    }
1364                })
1365                .collect::<Vec<_>>();
1366
1367            let hashes = blocks.iter().map(|b| b.hash()).collect();
1368            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1369            assert_eq!(res, expected);
1370        }
1371    }
1372}