reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24    validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload, PayloadOrAttributes,
25    PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{sync::Arc, time::Instant};
33use tokio::sync::oneshot;
34use tracing::{debug, trace, warn};
35
36/// The Engine API response sender.
37pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
38
39/// The upper limit for payload bodies request.
40const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
41
42/// The upper limit for blobs in `engine_getBlobsVx`.
43const MAX_BLOB_LIMIT: usize = 128;
44
45/// The Engine API implementation that grants the Consensus layer access to data and
46/// functions in the Execution layer that are crucial for the consensus process.
47///
48/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
49/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
50/// are still follow the engine API model.
51///
52/// ## Implementers
53///
54/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
55/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
56/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
57/// endpoints (e.g. opstack).
58/// See also [`EngineApiServer`] implementation for this type which is the
59/// L1 implementation.
60pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
61    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
62}
63
64impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
65    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
66{
67    /// Returns the configured chainspec.
68    pub fn chain_spec(&self) -> &Arc<ChainSpec> {
69        &self.inner.chain_spec
70    }
71}
72
73impl<Provider, PayloadT, Pool, Validator, ChainSpec>
74    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
75where
76    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
77    PayloadT: PayloadTypes,
78    Pool: TransactionPool + 'static,
79    Validator: EngineApiValidator<PayloadT>,
80    ChainSpec: EthereumHardforks + Send + Sync + 'static,
81{
82    /// Create new instance of [`EngineApi`].
83    #[expect(clippy::too_many_arguments)]
84    pub fn new(
85        provider: Provider,
86        chain_spec: Arc<ChainSpec>,
87        beacon_consensus: ConsensusEngineHandle<PayloadT>,
88        payload_store: PayloadStore<PayloadT>,
89        tx_pool: Pool,
90        task_spawner: Box<dyn TaskSpawner>,
91        client: ClientVersionV1,
92        capabilities: EngineCapabilities,
93        validator: Validator,
94        accept_execution_requests_hash: bool,
95    ) -> Self {
96        let inner = Arc::new(EngineApiInner {
97            provider,
98            chain_spec,
99            beacon_consensus,
100            payload_store,
101            task_spawner,
102            metrics: EngineApiMetrics::default(),
103            client,
104            capabilities,
105            tx_pool,
106            validator,
107            latest_new_payload_response: Mutex::new(None),
108            accept_execution_requests_hash,
109        });
110        Self { inner }
111    }
112
113    /// Fetches the client version.
114    pub fn get_client_version_v1(
115        &self,
116        _client: ClientVersionV1,
117    ) -> EngineApiResult<Vec<ClientVersionV1>> {
118        Ok(vec![self.inner.client.clone()])
119    }
120
121    /// Fetches the timestamp of the payload with the given id.
122    async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
123        Ok(self
124            .inner
125            .payload_store
126            .payload_timestamp(payload_id)
127            .await
128            .ok_or(EngineApiError::UnknownPayload)??)
129    }
130
131    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
132    /// Caution: This should not accept the `withdrawals` field
133    pub async fn new_payload_v1(
134        &self,
135        payload: PayloadT::ExecutionData,
136    ) -> EngineApiResult<PayloadStatus> {
137        let payload_or_attrs = PayloadOrAttributes::<
138            '_,
139            PayloadT::ExecutionData,
140            PayloadT::PayloadAttributes,
141        >::from_execution_payload(&payload);
142
143        self.inner
144            .validator
145            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
146
147        Ok(self
148            .inner
149            .beacon_consensus
150            .new_payload(payload)
151            .await
152            .inspect(|_| self.inner.on_new_payload_response())?)
153    }
154
155    /// Metered version of `new_payload_v1`.
156    pub async fn new_payload_v1_metered(
157        &self,
158        payload: PayloadT::ExecutionData,
159    ) -> EngineApiResult<PayloadStatus> {
160        let start = Instant::now();
161        let gas_used = payload.gas_used();
162
163        let res = Self::new_payload_v1(self, payload).await;
164        let elapsed = start.elapsed();
165        self.inner.metrics.latency.new_payload_v1.record(elapsed);
166        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
167        res
168    }
169
170    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
171    pub async fn new_payload_v2(
172        &self,
173        payload: PayloadT::ExecutionData,
174    ) -> EngineApiResult<PayloadStatus> {
175        let payload_or_attrs = PayloadOrAttributes::<
176            '_,
177            PayloadT::ExecutionData,
178            PayloadT::PayloadAttributes,
179        >::from_execution_payload(&payload);
180        self.inner
181            .validator
182            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
183        Ok(self
184            .inner
185            .beacon_consensus
186            .new_payload(payload)
187            .await
188            .inspect(|_| self.inner.on_new_payload_response())?)
189    }
190
191    /// Metered version of `new_payload_v2`.
192    pub async fn new_payload_v2_metered(
193        &self,
194        payload: PayloadT::ExecutionData,
195    ) -> EngineApiResult<PayloadStatus> {
196        let start = Instant::now();
197        let gas_used = payload.gas_used();
198
199        let res = Self::new_payload_v2(self, payload).await;
200        let elapsed = start.elapsed();
201        self.inner.metrics.latency.new_payload_v2.record(elapsed);
202        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
203        res
204    }
205
206    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
207    pub async fn new_payload_v3(
208        &self,
209        payload: PayloadT::ExecutionData,
210    ) -> EngineApiResult<PayloadStatus> {
211        let payload_or_attrs = PayloadOrAttributes::<
212            '_,
213            PayloadT::ExecutionData,
214            PayloadT::PayloadAttributes,
215        >::from_execution_payload(&payload);
216        self.inner
217            .validator
218            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
219
220        Ok(self
221            .inner
222            .beacon_consensus
223            .new_payload(payload)
224            .await
225            .inspect(|_| self.inner.on_new_payload_response())?)
226    }
227
228    /// Metrics version of `new_payload_v3`
229    pub async fn new_payload_v3_metered(
230        &self,
231        payload: PayloadT::ExecutionData,
232    ) -> RpcResult<PayloadStatus> {
233        let start = Instant::now();
234        let gas_used = payload.gas_used();
235
236        let res = Self::new_payload_v3(self, payload).await;
237        let elapsed = start.elapsed();
238        self.inner.metrics.latency.new_payload_v3.record(elapsed);
239        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
240        Ok(res?)
241    }
242
243    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
244    pub async fn new_payload_v4(
245        &self,
246        payload: PayloadT::ExecutionData,
247    ) -> EngineApiResult<PayloadStatus> {
248        let payload_or_attrs = PayloadOrAttributes::<
249            '_,
250            PayloadT::ExecutionData,
251            PayloadT::PayloadAttributes,
252        >::from_execution_payload(&payload);
253        self.inner
254            .validator
255            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
256
257        Ok(self
258            .inner
259            .beacon_consensus
260            .new_payload(payload)
261            .await
262            .inspect(|_| self.inner.on_new_payload_response())?)
263    }
264
265    /// Metrics version of `new_payload_v4`
266    pub async fn new_payload_v4_metered(
267        &self,
268        payload: PayloadT::ExecutionData,
269    ) -> RpcResult<PayloadStatus> {
270        let start = Instant::now();
271        let gas_used = payload.gas_used();
272
273        let res = Self::new_payload_v4(self, payload).await;
274
275        let elapsed = start.elapsed();
276        self.inner.metrics.latency.new_payload_v4.record(elapsed);
277        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
278        Ok(res?)
279    }
280
281    /// Returns whether the engine accepts execution requests hash.
282    pub fn accept_execution_requests_hash(&self) -> bool {
283        self.inner.accept_execution_requests_hash
284    }
285}
286
287impl<Provider, EngineT, Pool, Validator, ChainSpec>
288    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
289where
290    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
291    EngineT: EngineTypes,
292    Pool: TransactionPool + 'static,
293    Validator: EngineApiValidator<EngineT>,
294    ChainSpec: EthereumHardforks + Send + Sync + 'static,
295{
296    /// Sends a message to the beacon consensus engine to update the fork choice _without_
297    /// withdrawals.
298    ///
299    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
300    ///
301    /// Caution: This should not accept the `withdrawals` field
302    pub async fn fork_choice_updated_v1(
303        &self,
304        state: ForkchoiceState,
305        payload_attrs: Option<EngineT::PayloadAttributes>,
306    ) -> EngineApiResult<ForkchoiceUpdated> {
307        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
308            .await
309    }
310
311    /// Metrics version of `fork_choice_updated_v1`
312    pub async fn fork_choice_updated_v1_metered(
313        &self,
314        state: ForkchoiceState,
315        payload_attrs: Option<EngineT::PayloadAttributes>,
316    ) -> EngineApiResult<ForkchoiceUpdated> {
317        let start = Instant::now();
318        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
319        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
320        self.inner.metrics.fcu_response.update_response_metrics(&res);
321        res
322    }
323
324    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
325    /// but only _after_ shanghai.
326    ///
327    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
328    pub async fn fork_choice_updated_v2(
329        &self,
330        state: ForkchoiceState,
331        payload_attrs: Option<EngineT::PayloadAttributes>,
332    ) -> EngineApiResult<ForkchoiceUpdated> {
333        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
334            .await
335    }
336
337    /// Metrics version of `fork_choice_updated_v2`
338    pub async fn fork_choice_updated_v2_metered(
339        &self,
340        state: ForkchoiceState,
341        payload_attrs: Option<EngineT::PayloadAttributes>,
342    ) -> EngineApiResult<ForkchoiceUpdated> {
343        let start = Instant::now();
344        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
345        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
346        self.inner.metrics.fcu_response.update_response_metrics(&res);
347        res
348    }
349
350    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
351    /// but only _after_ cancun.
352    ///
353    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
354    pub async fn fork_choice_updated_v3(
355        &self,
356        state: ForkchoiceState,
357        payload_attrs: Option<EngineT::PayloadAttributes>,
358    ) -> EngineApiResult<ForkchoiceUpdated> {
359        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
360            .await
361    }
362
363    /// Metrics version of `fork_choice_updated_v3`
364    pub async fn fork_choice_updated_v3_metered(
365        &self,
366        state: ForkchoiceState,
367        payload_attrs: Option<EngineT::PayloadAttributes>,
368    ) -> EngineApiResult<ForkchoiceUpdated> {
369        let start = Instant::now();
370        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
371        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
372        self.inner.metrics.fcu_response.update_response_metrics(&res);
373        res
374    }
375
376    /// Helper function for retrieving the build payload by id.
377    async fn get_built_payload(
378        &self,
379        payload_id: PayloadId,
380    ) -> EngineApiResult<EngineT::BuiltPayload> {
381        self.inner
382            .payload_store
383            .resolve(payload_id)
384            .await
385            .ok_or(EngineApiError::UnknownPayload)?
386            .map_err(|_| EngineApiError::UnknownPayload)
387    }
388
389    /// Helper function for validating the payload timestamp and retrieving & converting the payload
390    /// into desired envelope.
391    async fn get_payload_inner<R>(
392        &self,
393        payload_id: PayloadId,
394        version: EngineApiMessageVersion,
395    ) -> EngineApiResult<R>
396    where
397        EngineT::BuiltPayload: TryInto<R>,
398    {
399        // validate timestamp according to engine rules
400        let timestamp = self.get_payload_timestamp(payload_id).await?;
401        validate_payload_timestamp(&self.inner.chain_spec, version, timestamp)?;
402
403        // Now resolve the payload
404        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
405            warn!(?version, "could not transform built payload");
406            EngineApiError::UnknownPayload
407        })
408    }
409
410    /// Returns the most recent version of the payload that is available in the corresponding
411    /// payload build process at the time of receiving this call.
412    ///
413    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
414    ///
415    /// Caution: This should not return the `withdrawals` field
416    ///
417    /// Note:
418    /// > Provider software MAY stop the corresponding build process after serving this call.
419    pub async fn get_payload_v1(
420        &self,
421        payload_id: PayloadId,
422    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
423        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
424            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
425            EngineApiError::UnknownPayload
426        })
427    }
428
429    /// Metrics version of `get_payload_v1`
430    pub async fn get_payload_v1_metered(
431        &self,
432        payload_id: PayloadId,
433    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
434        let start = Instant::now();
435        let res = Self::get_payload_v1(self, payload_id).await;
436        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
437        res
438    }
439
440    /// Returns the most recent version of the payload that is available in the corresponding
441    /// payload build process at the time of receiving this call.
442    ///
443    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
444    ///
445    /// Note:
446    /// > Provider software MAY stop the corresponding build process after serving this call.
447    pub async fn get_payload_v2(
448        &self,
449        payload_id: PayloadId,
450    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
451        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
452    }
453
454    /// Metrics version of `get_payload_v2`
455    pub async fn get_payload_v2_metered(
456        &self,
457        payload_id: PayloadId,
458    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
459        let start = Instant::now();
460        let res = Self::get_payload_v2(self, payload_id).await;
461        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
462        res
463    }
464
465    /// Returns the most recent version of the payload that is available in the corresponding
466    /// payload build process at the time of receiving this call.
467    ///
468    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
469    ///
470    /// Note:
471    /// > Provider software MAY stop the corresponding build process after serving this call.
472    pub async fn get_payload_v3(
473        &self,
474        payload_id: PayloadId,
475    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
476        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
477    }
478
479    /// Metrics version of `get_payload_v3`
480    pub async fn get_payload_v3_metered(
481        &self,
482        payload_id: PayloadId,
483    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
484        let start = Instant::now();
485        let res = Self::get_payload_v3(self, payload_id).await;
486        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
487        res
488    }
489
490    /// Returns the most recent version of the payload that is available in the corresponding
491    /// payload build process at the time of receiving this call.
492    ///
493    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
494    ///
495    /// Note:
496    /// > Provider software MAY stop the corresponding build process after serving this call.
497    pub async fn get_payload_v4(
498        &self,
499        payload_id: PayloadId,
500    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
501        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
502    }
503
504    /// Metrics version of `get_payload_v4`
505    pub async fn get_payload_v4_metered(
506        &self,
507        payload_id: PayloadId,
508    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
509        let start = Instant::now();
510        let res = Self::get_payload_v4(self, payload_id).await;
511        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
512        res
513    }
514
515    /// Handler for `engine_getPayloadV5`
516    ///
517    /// Returns the most recent version of the payload that is available in the corresponding
518    /// payload build process at the time of receiving this call.
519    ///
520    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
521    ///
522    /// Note:
523    /// > Provider software MAY stop the corresponding build process after serving this call.
524    pub async fn get_payload_v5(
525        &self,
526        payload_id: PayloadId,
527    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
528        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
529    }
530
531    /// Metrics version of `get_payload_v5`
532    pub async fn get_payload_v5_metered(
533        &self,
534        payload_id: PayloadId,
535    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
536        let start = Instant::now();
537        let res = Self::get_payload_v5(self, payload_id).await;
538        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
539        res
540    }
541
542    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
543    /// blocks and returns the mapped payload bodies.
544    pub async fn get_payload_bodies_by_range_with<F, R>(
545        &self,
546        start: BlockNumber,
547        count: u64,
548        f: F,
549    ) -> EngineApiResult<Vec<Option<R>>>
550    where
551        F: Fn(Provider::Block) -> R + Send + 'static,
552        R: Send + 'static,
553    {
554        let (tx, rx) = oneshot::channel();
555        let inner = self.inner.clone();
556
557        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
558            if count > MAX_PAYLOAD_BODIES_LIMIT {
559                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
560                return;
561            }
562
563            if start == 0 || count == 0 {
564                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
565                return;
566            }
567
568            let mut result = Vec::with_capacity(count as usize);
569
570            // -1 so range is inclusive
571            let mut end = start.saturating_add(count - 1);
572
573            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
574            // truncate the end if it's greater than the last block
575            if let Ok(best_block) = inner.provider.best_block_number() {
576                if end > best_block {
577                    end = best_block;
578                }
579            }
580
581            for num in start..=end {
582                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
583                match block_result {
584                    Ok(block) => {
585                        result.push(block.map(&f));
586                    }
587                    Err(err) => {
588                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
589                        return;
590                    }
591                };
592            }
593            tx.send(Ok(result)).ok();
594        }));
595
596        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
597    }
598
599    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
600    /// blocks.
601    ///
602    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
603    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
604    /// adversarial.
605    ///
606    /// Implementers should take care when acting on the input to this method, specifically
607    /// ensuring that the range is limited properly, and that the range boundaries are computed
608    /// correctly and without panics.
609    pub async fn get_payload_bodies_by_range_v1(
610        &self,
611        start: BlockNumber,
612        count: u64,
613    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
614        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
615            transactions: block.body().encoded_2718_transactions(),
616            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
617        })
618        .await
619    }
620
621    /// Metrics version of `get_payload_bodies_by_range_v1`
622    pub async fn get_payload_bodies_by_range_v1_metered(
623        &self,
624        start: BlockNumber,
625        count: u64,
626    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
627        let start_time = Instant::now();
628        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
629        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
630        res
631    }
632
633    /// Called to retrieve execution payload bodies by hashes.
634    pub async fn get_payload_bodies_by_hash_with<F, R>(
635        &self,
636        hashes: Vec<BlockHash>,
637        f: F,
638    ) -> EngineApiResult<Vec<Option<R>>>
639    where
640        F: Fn(Provider::Block) -> R + Send + 'static,
641        R: Send + 'static,
642    {
643        let len = hashes.len() as u64;
644        if len > MAX_PAYLOAD_BODIES_LIMIT {
645            return Err(EngineApiError::PayloadRequestTooLarge { len });
646        }
647
648        let (tx, rx) = oneshot::channel();
649        let inner = self.inner.clone();
650
651        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
652            let mut result = Vec::with_capacity(hashes.len());
653            for hash in hashes {
654                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
655                match block_result {
656                    Ok(block) => {
657                        result.push(block.map(&f));
658                    }
659                    Err(err) => {
660                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
661                        return;
662                    }
663                }
664            }
665            tx.send(Ok(result)).ok();
666        }));
667
668        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
669    }
670
671    /// Called to retrieve execution payload bodies by hashes.
672    pub async fn get_payload_bodies_by_hash_v1(
673        &self,
674        hashes: Vec<BlockHash>,
675    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
676        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
677            transactions: block.body().encoded_2718_transactions(),
678            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
679        })
680        .await
681    }
682
683    /// Metrics version of `get_payload_bodies_by_hash_v1`
684    pub async fn get_payload_bodies_by_hash_v1_metered(
685        &self,
686        hashes: Vec<BlockHash>,
687    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
688        let start = Instant::now();
689        let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
690        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
691        res.await
692    }
693
694    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
695    /// update.
696    ///
697    /// The payload attributes will be validated according to the engine API rules for the given
698    /// message version:
699    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
700    ///   validated according to the Paris rules.
701    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
702    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
703    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
704    ///
705    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
706    ///   validated according to the Cancun rules.
707    async fn validate_and_execute_forkchoice(
708        &self,
709        version: EngineApiMessageVersion,
710        state: ForkchoiceState,
711        payload_attrs: Option<EngineT::PayloadAttributes>,
712    ) -> EngineApiResult<ForkchoiceUpdated> {
713        self.inner.record_elapsed_time_on_fcu();
714
715        if let Some(ref attrs) = payload_attrs {
716            let attr_validation_res =
717                self.inner.validator.ensure_well_formed_attributes(version, attrs);
718
719            // From the engine API spec:
720            //
721            // Client software MUST ensure that payloadAttributes.timestamp is greater than
722            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
723            // isn't held client software MUST respond with -38003: Invalid payload attributes and
724            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
725            // update MUST NOT be rolled back.
726            //
727            // NOTE: This will also apply to the validation result for the cancun or
728            // shanghai-specific fields provided in the payload attributes.
729            //
730            // To do this, we set the payload attrs to `None` if attribute validation failed, but
731            // we still apply the forkchoice update.
732            if let Err(err) = attr_validation_res {
733                let fcu_res =
734                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
735                // TODO: decide if we want this branch - the FCU INVALID response might be more
736                // useful than the payload attributes INVALID response
737                if fcu_res.is_invalid() {
738                    return Ok(fcu_res)
739                }
740                return Err(err.into())
741            }
742        }
743
744        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
745    }
746
747    /// Returns reference to supported capabilities.
748    pub fn capabilities(&self) -> &EngineCapabilities {
749        &self.inner.capabilities
750    }
751
752    fn get_blobs_v1(
753        &self,
754        versioned_hashes: Vec<B256>,
755    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
756        if versioned_hashes.len() > MAX_BLOB_LIMIT {
757            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
758        }
759
760        self.inner
761            .tx_pool
762            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
763            .map_err(|err| EngineApiError::Internal(Box::new(err)))
764    }
765
766    /// Metered version of `get_blobs_v1`.
767    pub fn get_blobs_v1_metered(
768        &self,
769        versioned_hashes: Vec<B256>,
770    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
771        let hashes_len = versioned_hashes.len();
772        let start = Instant::now();
773        let res = Self::get_blobs_v1(self, versioned_hashes);
774        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
775
776        if let Ok(blobs) = &res {
777            let blobs_found = blobs.iter().flatten().count();
778            let blobs_missed = hashes_len - blobs_found;
779
780            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
781            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
782        }
783
784        res
785    }
786
787    fn get_blobs_v2(
788        &self,
789        versioned_hashes: Vec<B256>,
790    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
791        if versioned_hashes.len() > MAX_BLOB_LIMIT {
792            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
793        }
794
795        self.inner
796            .tx_pool
797            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
798            .map_err(|err| EngineApiError::Internal(Box::new(err)))
799    }
800
801    /// Metered version of `get_blobs_v2`.
802    pub fn get_blobs_v2_metered(
803        &self,
804        versioned_hashes: Vec<B256>,
805    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
806        let hashes_len = versioned_hashes.len();
807        let start = Instant::now();
808        let res = Self::get_blobs_v2(self, versioned_hashes);
809        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
810
811        if let Ok(blobs) = &res {
812            let blobs_found = blobs.iter().flatten().count();
813
814            self.inner
815                .metrics
816                .blob_metrics
817                .get_blobs_requests_blobs_total
818                .increment(hashes_len as u64);
819            self.inner
820                .metrics
821                .blob_metrics
822                .get_blobs_requests_blobs_in_blobpool_total
823                .increment(blobs_found as u64);
824
825            if blobs_found == hashes_len {
826                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
827            } else {
828                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
829            }
830        } else {
831            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
832        }
833
834        res
835    }
836}
837
838// This is the concrete ethereum engine API implementation.
839#[async_trait]
840impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
841    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
842where
843    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
844    EngineT: EngineTypes<ExecutionData = ExecutionData>,
845    Pool: TransactionPool + 'static,
846    Validator: EngineApiValidator<EngineT>,
847    ChainSpec: EthereumHardforks + Send + Sync + 'static,
848{
849    /// Handler for `engine_newPayloadV1`
850    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
851    /// Caution: This should not accept the `withdrawals` field
852    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
853        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
854        let payload =
855            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
856        Ok(self.new_payload_v1_metered(payload).await?)
857    }
858
859    /// Handler for `engine_newPayloadV2`
860    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
861    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
862        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
863        let payload = ExecutionData {
864            payload: payload.into_payload(),
865            sidecar: ExecutionPayloadSidecar::none(),
866        };
867
868        Ok(self.new_payload_v2_metered(payload).await?)
869    }
870
871    /// Handler for `engine_newPayloadV3`
872    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
873    async fn new_payload_v3(
874        &self,
875        payload: ExecutionPayloadV3,
876        versioned_hashes: Vec<B256>,
877        parent_beacon_block_root: B256,
878    ) -> RpcResult<PayloadStatus> {
879        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
880        let payload = ExecutionData {
881            payload: payload.into(),
882            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
883                versioned_hashes,
884                parent_beacon_block_root,
885            }),
886        };
887
888        Ok(self.new_payload_v3_metered(payload).await?)
889    }
890
891    /// Handler for `engine_newPayloadV4`
892    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
893    async fn new_payload_v4(
894        &self,
895        payload: ExecutionPayloadV3,
896        versioned_hashes: Vec<B256>,
897        parent_beacon_block_root: B256,
898        requests: RequestsOrHash,
899    ) -> RpcResult<PayloadStatus> {
900        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
901
902        // Accept requests as a hash only if it is explicitly allowed
903        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
904            return Err(EngineApiError::UnexpectedRequestsHash.into());
905        }
906
907        let payload = ExecutionData {
908            payload: payload.into(),
909            sidecar: ExecutionPayloadSidecar::v4(
910                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
911                PraguePayloadFields { requests },
912            ),
913        };
914
915        Ok(self.new_payload_v4_metered(payload).await?)
916    }
917
918    /// Handler for `engine_forkchoiceUpdatedV1`
919    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
920    ///
921    /// Caution: This should not accept the `withdrawals` field
922    async fn fork_choice_updated_v1(
923        &self,
924        fork_choice_state: ForkchoiceState,
925        payload_attributes: Option<EngineT::PayloadAttributes>,
926    ) -> RpcResult<ForkchoiceUpdated> {
927        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
928        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
929    }
930
931    /// Handler for `engine_forkchoiceUpdatedV2`
932    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
933    async fn fork_choice_updated_v2(
934        &self,
935        fork_choice_state: ForkchoiceState,
936        payload_attributes: Option<EngineT::PayloadAttributes>,
937    ) -> RpcResult<ForkchoiceUpdated> {
938        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
939        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
940    }
941
942    /// Handler for `engine_forkchoiceUpdatedV2`
943    ///
944    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
945    async fn fork_choice_updated_v3(
946        &self,
947        fork_choice_state: ForkchoiceState,
948        payload_attributes: Option<EngineT::PayloadAttributes>,
949    ) -> RpcResult<ForkchoiceUpdated> {
950        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
951        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
952    }
953
954    /// Handler for `engine_getPayloadV1`
955    ///
956    /// Returns the most recent version of the payload that is available in the corresponding
957    /// payload build process at the time of receiving this call.
958    ///
959    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
960    ///
961    /// Caution: This should not return the `withdrawals` field
962    ///
963    /// Note:
964    /// > Provider software MAY stop the corresponding build process after serving this call.
965    async fn get_payload_v1(
966        &self,
967        payload_id: PayloadId,
968    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
969        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
970        Ok(self.get_payload_v1_metered(payload_id).await?)
971    }
972
973    /// Handler for `engine_getPayloadV2`
974    ///
975    /// Returns the most recent version of the payload that is available in the corresponding
976    /// payload build process at the time of receiving this call.
977    ///
978    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
979    ///
980    /// Note:
981    /// > Provider software MAY stop the corresponding build process after serving this call.
982    async fn get_payload_v2(
983        &self,
984        payload_id: PayloadId,
985    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
986        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
987        Ok(self.get_payload_v2_metered(payload_id).await?)
988    }
989
990    /// Handler for `engine_getPayloadV3`
991    ///
992    /// Returns the most recent version of the payload that is available in the corresponding
993    /// payload build process at the time of receiving this call.
994    ///
995    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
996    ///
997    /// Note:
998    /// > Provider software MAY stop the corresponding build process after serving this call.
999    async fn get_payload_v3(
1000        &self,
1001        payload_id: PayloadId,
1002    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1003        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1004        Ok(self.get_payload_v3_metered(payload_id).await?)
1005    }
1006
1007    /// Handler for `engine_getPayloadV4`
1008    ///
1009    /// Returns the most recent version of the payload that is available in the corresponding
1010    /// payload build process at the time of receiving this call.
1011    ///
1012    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1013    ///
1014    /// Note:
1015    /// > Provider software MAY stop the corresponding build process after serving this call.
1016    async fn get_payload_v4(
1017        &self,
1018        payload_id: PayloadId,
1019    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1020        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1021        Ok(self.get_payload_v4_metered(payload_id).await?)
1022    }
1023
1024    /// Handler for `engine_getPayloadV5`
1025    ///
1026    /// Returns the most recent version of the payload that is available in the corresponding
1027    /// payload build process at the time of receiving this call.
1028    ///
1029    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1030    ///
1031    /// Note:
1032    /// > Provider software MAY stop the corresponding build process after serving this call.
1033    async fn get_payload_v5(
1034        &self,
1035        payload_id: PayloadId,
1036    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1037        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1038        Ok(self.get_payload_v5_metered(payload_id).await?)
1039    }
1040
1041    /// Handler for `engine_getPayloadBodiesByHashV1`
1042    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1043    async fn get_payload_bodies_by_hash_v1(
1044        &self,
1045        block_hashes: Vec<BlockHash>,
1046    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1047        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1048        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1049    }
1050
1051    /// Handler for `engine_getPayloadBodiesByRangeV1`
1052    ///
1053    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1054    ///
1055    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1056    /// blocks.
1057    ///
1058    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1059    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1060    /// adversarial.
1061    ///
1062    /// Implementers should take care when acting on the input to this method, specifically
1063    /// ensuring that the range is limited properly, and that the range boundaries are computed
1064    /// correctly and without panics.
1065    ///
1066    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1067    async fn get_payload_bodies_by_range_v1(
1068        &self,
1069        start: U64,
1070        count: U64,
1071    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1072        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1073        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1074    }
1075
1076    /// Handler for `engine_getClientVersionV1`
1077    ///
1078    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1079    async fn get_client_version_v1(
1080        &self,
1081        client: ClientVersionV1,
1082    ) -> RpcResult<Vec<ClientVersionV1>> {
1083        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1084        Ok(Self::get_client_version_v1(self, client)?)
1085    }
1086
1087    /// Handler for `engine_exchangeCapabilitiesV1`
1088    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1089    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1090        Ok(self.capabilities().list())
1091    }
1092
1093    async fn get_blobs_v1(
1094        &self,
1095        versioned_hashes: Vec<B256>,
1096    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1097        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1098        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1099    }
1100
1101    async fn get_blobs_v2(
1102        &self,
1103        versioned_hashes: Vec<B256>,
1104    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1105        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1106        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1107    }
1108}
1109
1110impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1111    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1112where
1113    EngineT: EngineTypes,
1114    Self: EngineApiServer<EngineT>,
1115{
1116    fn into_rpc_module(self) -> RpcModule<()> {
1117        self.into_rpc().remove_context()
1118    }
1119}
1120
1121impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1122    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1123where
1124    PayloadT: PayloadTypes,
1125{
1126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1127        f.debug_struct("EngineApi").finish_non_exhaustive()
1128    }
1129}
1130
1131impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1132    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1133where
1134    PayloadT: PayloadTypes,
1135{
1136    fn clone(&self) -> Self {
1137        Self { inner: Arc::clone(&self.inner) }
1138    }
1139}
1140
1141/// The container type for the engine API internals.
1142struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1143    /// The provider to interact with the chain.
1144    provider: Provider,
1145    /// Consensus configuration
1146    chain_spec: Arc<ChainSpec>,
1147    /// The channel to send messages to the beacon consensus engine.
1148    beacon_consensus: ConsensusEngineHandle<PayloadT>,
1149    /// The type that can communicate with the payload service to retrieve payloads.
1150    payload_store: PayloadStore<PayloadT>,
1151    /// For spawning and executing async tasks
1152    task_spawner: Box<dyn TaskSpawner>,
1153    /// The latency and response type metrics for engine api calls
1154    metrics: EngineApiMetrics,
1155    /// Identification of the execution client used by the consensus client
1156    client: ClientVersionV1,
1157    /// The list of all supported Engine capabilities available over the engine endpoint.
1158    capabilities: EngineCapabilities,
1159    /// Transaction pool.
1160    tx_pool: Pool,
1161    /// Engine validator.
1162    validator: Validator,
1163    /// Start time of the latest payload request
1164    latest_new_payload_response: Mutex<Option<Instant>>,
1165    accept_execution_requests_hash: bool,
1166}
1167
1168impl<Provider, PayloadT, Pool, Validator, ChainSpec>
1169    EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
1170where
1171    PayloadT: PayloadTypes,
1172{
1173    /// Tracks the elapsed time between the new payload response and the received forkchoice update
1174    /// request.
1175    fn record_elapsed_time_on_fcu(&self) {
1176        if let Some(start_time) = self.latest_new_payload_response.lock().take() {
1177            let elapsed_time = start_time.elapsed();
1178            self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
1179        }
1180    }
1181
1182    /// Updates the timestamp for the latest new payload response.
1183    fn on_new_payload_response(&self) {
1184        self.latest_new_payload_response.lock().replace(Instant::now());
1185    }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use super::*;
1191    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1192    use assert_matches::assert_matches;
1193    use reth_chainspec::{ChainSpec, MAINNET};
1194    use reth_engine_primitives::BeaconEngineMessage;
1195    use reth_ethereum_engine_primitives::EthEngineTypes;
1196    use reth_ethereum_primitives::Block;
1197    use reth_node_ethereum::EthereumEngineValidator;
1198    use reth_payload_builder::test_utils::spawn_test_payload_service;
1199    use reth_provider::test_utils::MockEthProvider;
1200    use reth_tasks::TokioTaskExecutor;
1201    use reth_transaction_pool::noop::NoopTransactionPool;
1202    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1203
1204    fn setup_engine_api() -> (
1205        EngineApiTestHandle,
1206        EngineApi<
1207            Arc<MockEthProvider>,
1208            EthEngineTypes,
1209            NoopTransactionPool,
1210            EthereumEngineValidator,
1211            ChainSpec,
1212        >,
1213    ) {
1214        let client = ClientVersionV1 {
1215            code: ClientCode::RH,
1216            name: "Reth".to_string(),
1217            version: "v0.2.0-beta.5".to_string(),
1218            commit: "defa64b2".to_string(),
1219        };
1220
1221        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1222        let provider = Arc::new(MockEthProvider::default());
1223        let payload_store = spawn_test_payload_service();
1224        let (to_engine, engine_rx) = unbounded_channel();
1225        let task_executor = Box::<TokioTaskExecutor>::default();
1226        let api = EngineApi::new(
1227            provider.clone(),
1228            chain_spec.clone(),
1229            ConsensusEngineHandle::new(to_engine),
1230            payload_store.into(),
1231            NoopTransactionPool::default(),
1232            task_executor,
1233            client,
1234            EngineCapabilities::default(),
1235            EthereumEngineValidator::new(chain_spec.clone()),
1236            false,
1237        );
1238        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1239        (handle, api)
1240    }
1241
1242    #[tokio::test]
1243    async fn engine_client_version_v1() {
1244        let client = ClientVersionV1 {
1245            code: ClientCode::RH,
1246            name: "Reth".to_string(),
1247            version: "v0.2.0-beta.5".to_string(),
1248            commit: "defa64b2".to_string(),
1249        };
1250        let (_, api) = setup_engine_api();
1251        let res = api.get_client_version_v1(client.clone());
1252        assert_eq!(res.unwrap(), vec![client]);
1253    }
1254
1255    struct EngineApiTestHandle {
1256        #[allow(dead_code)]
1257        chain_spec: Arc<ChainSpec>,
1258        provider: Arc<MockEthProvider>,
1259        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1260    }
1261
1262    #[tokio::test]
1263    async fn forwards_responses_to_consensus_engine() {
1264        let (mut handle, api) = setup_engine_api();
1265
1266        tokio::spawn(async move {
1267            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1268            let execution_data = ExecutionData {
1269                payload: payload_v1.into(),
1270                sidecar: ExecutionPayloadSidecar::none(),
1271            };
1272
1273            api.new_payload_v1(execution_data).await.unwrap();
1274        });
1275        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1276    }
1277
1278    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1279    mod get_payload_bodies {
1280        use super::*;
1281        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1282        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1283
1284        #[tokio::test]
1285        async fn invalid_params() {
1286            let (_, api) = setup_engine_api();
1287
1288            let by_range_tests = [
1289                // (start, count)
1290                (0, 0),
1291                (0, 1),
1292                (1, 0),
1293            ];
1294
1295            // test [EngineApiMessage::GetPayloadBodiesByRange]
1296            for (start, count) in by_range_tests {
1297                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1298                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1299            }
1300        }
1301
1302        #[tokio::test]
1303        async fn request_too_large() {
1304            let (_, api) = setup_engine_api();
1305
1306            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1307            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1308            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1309        }
1310
1311        #[tokio::test]
1312        async fn returns_payload_bodies() {
1313            let mut rng = generators::rng();
1314            let (handle, api) = setup_engine_api();
1315
1316            let (start, count) = (1, 10);
1317            let blocks = random_block_range(
1318                &mut rng,
1319                start..=start + count - 1,
1320                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1321            );
1322            handle
1323                .provider
1324                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1325
1326            let expected = blocks
1327                .iter()
1328                .cloned()
1329                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1330                .collect::<Vec<_>>();
1331
1332            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1333            assert_eq!(res, expected);
1334        }
1335
1336        #[tokio::test]
1337        async fn returns_payload_bodies_with_gaps() {
1338            let mut rng = generators::rng();
1339            let (handle, api) = setup_engine_api();
1340
1341            let (start, count) = (1, 100);
1342            let blocks = random_block_range(
1343                &mut rng,
1344                start..=start + count - 1,
1345                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1346            );
1347
1348            // Insert only blocks in ranges 1-25 and 50-75
1349            let first_missing_range = 26..=50;
1350            let second_missing_range = 76..=100;
1351            handle.provider.extend_blocks(
1352                blocks
1353                    .iter()
1354                    .filter(|b| {
1355                        !first_missing_range.contains(&b.number) &&
1356                            !second_missing_range.contains(&b.number)
1357                    })
1358                    .map(|b| (b.hash(), b.clone().into_block())),
1359            );
1360
1361            let expected = blocks
1362                .iter()
1363                // filter anything after the second missing range to ensure we don't expect trailing
1364                // `None`s
1365                .filter(|b| !second_missing_range.contains(&b.number))
1366                .cloned()
1367                .map(|b| {
1368                    if first_missing_range.contains(&b.number) {
1369                        None
1370                    } else {
1371                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1372                    }
1373                })
1374                .collect::<Vec<_>>();
1375
1376            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1377            assert_eq!(res, expected);
1378
1379            let expected = blocks
1380                .iter()
1381                .cloned()
1382                // ensure we still return trailing `None`s here because by-hash will not be aware
1383                // of the missing block's number, and cannot compare it to the current best block
1384                .map(|b| {
1385                    if first_missing_range.contains(&b.number) ||
1386                        second_missing_range.contains(&b.number)
1387                    {
1388                        None
1389                    } else {
1390                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1391                    }
1392                })
1393                .collect::<Vec<_>>();
1394
1395            let hashes = blocks.iter().map(|b| b.hash()).collect();
1396            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1397            assert_eq!(res, expected);
1398        }
1399    }
1400}