reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes, EngineValidator};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24    validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload,
25    PayloadBuilderAttributes, PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{sync::Arc, time::Instant};
33use tokio::sync::oneshot;
34use tracing::{debug, trace, warn};
35
36/// The Engine API response sender.
37pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
38
39/// The upper limit for payload bodies request.
40const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
41
42/// The upper limit for blobs in `engine_getBlobsVx`.
43const MAX_BLOB_LIMIT: usize = 128;
44
45/// The Engine API implementation that grants the Consensus layer access to data and
46/// functions in the Execution layer that are crucial for the consensus process.
47///
48/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
49/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
50/// are still follow the engine API model.
51///
52/// ## Implementers
53///
54/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
55/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
56/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
57/// endpoints (e.g. opstack).
58/// See also [`EngineApiServer`] implementation for this type which is the
59/// L1 implementation.
60pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
61    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
62}
63
64impl<Provider, PayloadT, Pool, Validator, ChainSpec>
65    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
66where
67    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
68    PayloadT: PayloadTypes,
69    Pool: TransactionPool + 'static,
70    Validator: EngineValidator<PayloadT>,
71    ChainSpec: EthereumHardforks + Send + Sync + 'static,
72{
73    /// Create new instance of [`EngineApi`].
74    #[expect(clippy::too_many_arguments)]
75    pub fn new(
76        provider: Provider,
77        chain_spec: Arc<ChainSpec>,
78        beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
79        payload_store: PayloadStore<PayloadT>,
80        tx_pool: Pool,
81        task_spawner: Box<dyn TaskSpawner>,
82        client: ClientVersionV1,
83        capabilities: EngineCapabilities,
84        validator: Validator,
85        accept_execution_requests_hash: bool,
86    ) -> Self {
87        let inner = Arc::new(EngineApiInner {
88            provider,
89            chain_spec,
90            beacon_consensus,
91            payload_store,
92            task_spawner,
93            metrics: EngineApiMetrics::default(),
94            client,
95            capabilities,
96            tx_pool,
97            validator,
98            latest_new_payload_response: Mutex::new(None),
99            accept_execution_requests_hash,
100        });
101        Self { inner }
102    }
103
104    /// Fetches the client version.
105    pub fn get_client_version_v1(
106        &self,
107        _client: ClientVersionV1,
108    ) -> EngineApiResult<Vec<ClientVersionV1>> {
109        Ok(vec![self.inner.client.clone()])
110    }
111
112    /// Fetches the attributes for the payload with the given id.
113    async fn get_payload_attributes(
114        &self,
115        payload_id: PayloadId,
116    ) -> EngineApiResult<PayloadT::PayloadBuilderAttributes> {
117        Ok(self
118            .inner
119            .payload_store
120            .payload_attributes(payload_id)
121            .await
122            .ok_or(EngineApiError::UnknownPayload)??)
123    }
124
125    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
126    /// Caution: This should not accept the `withdrawals` field
127    pub async fn new_payload_v1(
128        &self,
129        payload: PayloadT::ExecutionData,
130    ) -> EngineApiResult<PayloadStatus> {
131        let payload_or_attrs = PayloadOrAttributes::<
132            '_,
133            PayloadT::ExecutionData,
134            PayloadT::PayloadAttributes,
135        >::from_execution_payload(&payload);
136
137        self.inner
138            .validator
139            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
140
141        Ok(self
142            .inner
143            .beacon_consensus
144            .new_payload(payload)
145            .await
146            .inspect(|_| self.inner.on_new_payload_response())?)
147    }
148
149    /// Metered version of `new_payload_v1`.
150    pub async fn new_payload_v1_metered(
151        &self,
152        payload: PayloadT::ExecutionData,
153    ) -> EngineApiResult<PayloadStatus> {
154        let start = Instant::now();
155        let gas_used = payload.gas_used();
156
157        let res = Self::new_payload_v1(self, payload).await;
158        let elapsed = start.elapsed();
159        self.inner.metrics.latency.new_payload_v1.record(elapsed);
160        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
161        res
162    }
163
164    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
165    pub async fn new_payload_v2(
166        &self,
167        payload: PayloadT::ExecutionData,
168    ) -> EngineApiResult<PayloadStatus> {
169        let payload_or_attrs = PayloadOrAttributes::<
170            '_,
171            PayloadT::ExecutionData,
172            PayloadT::PayloadAttributes,
173        >::from_execution_payload(&payload);
174        self.inner
175            .validator
176            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
177        Ok(self
178            .inner
179            .beacon_consensus
180            .new_payload(payload)
181            .await
182            .inspect(|_| self.inner.on_new_payload_response())?)
183    }
184
185    /// Metered version of `new_payload_v2`.
186    pub async fn new_payload_v2_metered(
187        &self,
188        payload: PayloadT::ExecutionData,
189    ) -> EngineApiResult<PayloadStatus> {
190        let start = Instant::now();
191        let gas_used = payload.gas_used();
192
193        let res = Self::new_payload_v2(self, payload).await;
194        let elapsed = start.elapsed();
195        self.inner.metrics.latency.new_payload_v2.record(elapsed);
196        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
197        res
198    }
199
200    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
201    pub async fn new_payload_v3(
202        &self,
203        payload: PayloadT::ExecutionData,
204    ) -> EngineApiResult<PayloadStatus> {
205        let payload_or_attrs = PayloadOrAttributes::<
206            '_,
207            PayloadT::ExecutionData,
208            PayloadT::PayloadAttributes,
209        >::from_execution_payload(&payload);
210        self.inner
211            .validator
212            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
213
214        Ok(self
215            .inner
216            .beacon_consensus
217            .new_payload(payload)
218            .await
219            .inspect(|_| self.inner.on_new_payload_response())?)
220    }
221
222    /// Metrics version of `new_payload_v3`
223    pub async fn new_payload_v3_metered(
224        &self,
225        payload: PayloadT::ExecutionData,
226    ) -> RpcResult<PayloadStatus> {
227        let start = Instant::now();
228        let gas_used = payload.gas_used();
229
230        let res = Self::new_payload_v3(self, payload).await;
231        let elapsed = start.elapsed();
232        self.inner.metrics.latency.new_payload_v3.record(elapsed);
233        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
234        Ok(res?)
235    }
236
237    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
238    pub async fn new_payload_v4(
239        &self,
240        payload: PayloadT::ExecutionData,
241    ) -> EngineApiResult<PayloadStatus> {
242        let payload_or_attrs = PayloadOrAttributes::<
243            '_,
244            PayloadT::ExecutionData,
245            PayloadT::PayloadAttributes,
246        >::from_execution_payload(&payload);
247        self.inner
248            .validator
249            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
250
251        Ok(self
252            .inner
253            .beacon_consensus
254            .new_payload(payload)
255            .await
256            .inspect(|_| self.inner.on_new_payload_response())?)
257    }
258
259    /// Metrics version of `new_payload_v4`
260    pub async fn new_payload_v4_metered(
261        &self,
262        payload: PayloadT::ExecutionData,
263    ) -> RpcResult<PayloadStatus> {
264        let start = Instant::now();
265        let gas_used = payload.gas_used();
266
267        let res = Self::new_payload_v4(self, payload).await;
268
269        let elapsed = start.elapsed();
270        self.inner.metrics.latency.new_payload_v4.record(elapsed);
271        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
272        Ok(res?)
273    }
274
275    /// Returns whether the engine accepts execution requests hash.
276    pub fn accept_execution_requests_hash(&self) -> bool {
277        self.inner.accept_execution_requests_hash
278    }
279}
280
281impl<Provider, EngineT, Pool, Validator, ChainSpec>
282    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
283where
284    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
285    EngineT: EngineTypes,
286    Pool: TransactionPool + 'static,
287    Validator: EngineValidator<EngineT>,
288    ChainSpec: EthereumHardforks + Send + Sync + 'static,
289{
290    /// Sends a message to the beacon consensus engine to update the fork choice _without_
291    /// withdrawals.
292    ///
293    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
294    ///
295    /// Caution: This should not accept the `withdrawals` field
296    pub async fn fork_choice_updated_v1(
297        &self,
298        state: ForkchoiceState,
299        payload_attrs: Option<EngineT::PayloadAttributes>,
300    ) -> EngineApiResult<ForkchoiceUpdated> {
301        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
302            .await
303    }
304
305    /// Metrics version of `fork_choice_updated_v1`
306    pub async fn fork_choice_updated_v1_metered(
307        &self,
308        state: ForkchoiceState,
309        payload_attrs: Option<EngineT::PayloadAttributes>,
310    ) -> EngineApiResult<ForkchoiceUpdated> {
311        let start = Instant::now();
312        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
313        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
314        self.inner.metrics.fcu_response.update_response_metrics(&res);
315        res
316    }
317
318    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
319    /// but only _after_ shanghai.
320    ///
321    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
322    pub async fn fork_choice_updated_v2(
323        &self,
324        state: ForkchoiceState,
325        payload_attrs: Option<EngineT::PayloadAttributes>,
326    ) -> EngineApiResult<ForkchoiceUpdated> {
327        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
328            .await
329    }
330
331    /// Metrics version of `fork_choice_updated_v2`
332    pub async fn fork_choice_updated_v2_metered(
333        &self,
334        state: ForkchoiceState,
335        payload_attrs: Option<EngineT::PayloadAttributes>,
336    ) -> EngineApiResult<ForkchoiceUpdated> {
337        let start = Instant::now();
338        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
339        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
340        self.inner.metrics.fcu_response.update_response_metrics(&res);
341        res
342    }
343
344    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
345    /// but only _after_ cancun.
346    ///
347    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
348    pub async fn fork_choice_updated_v3(
349        &self,
350        state: ForkchoiceState,
351        payload_attrs: Option<EngineT::PayloadAttributes>,
352    ) -> EngineApiResult<ForkchoiceUpdated> {
353        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
354            .await
355    }
356
357    /// Metrics version of `fork_choice_updated_v3`
358    pub async fn fork_choice_updated_v3_metered(
359        &self,
360        state: ForkchoiceState,
361        payload_attrs: Option<EngineT::PayloadAttributes>,
362    ) -> EngineApiResult<ForkchoiceUpdated> {
363        let start = Instant::now();
364        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
365        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
366        self.inner.metrics.fcu_response.update_response_metrics(&res);
367        res
368    }
369
370    /// Helper function for retrieving the build payload by id.
371    async fn get_built_payload(
372        &self,
373        payload_id: PayloadId,
374    ) -> EngineApiResult<EngineT::BuiltPayload> {
375        self.inner
376            .payload_store
377            .resolve(payload_id)
378            .await
379            .ok_or(EngineApiError::UnknownPayload)?
380            .map_err(|_| EngineApiError::UnknownPayload)
381    }
382
383    /// Helper function for validating the payload timestamp and retrieving & converting the payload
384    /// into desired envelope.
385    async fn get_payload_inner<R>(
386        &self,
387        payload_id: PayloadId,
388        version: EngineApiMessageVersion,
389    ) -> EngineApiResult<R>
390    where
391        EngineT::BuiltPayload: TryInto<R>,
392    {
393        // First we fetch the payload attributes to check the timestamp
394        let attributes = self.get_payload_attributes(payload_id).await?;
395
396        // validate timestamp according to engine rules
397        validate_payload_timestamp(&self.inner.chain_spec, version, attributes.timestamp())?;
398
399        // Now resolve the payload
400        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
401            warn!(?version, "could not transform built payload");
402            EngineApiError::UnknownPayload
403        })
404    }
405
406    /// Returns the most recent version of the payload that is available in the corresponding
407    /// payload build process at the time of receiving this call.
408    ///
409    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
410    ///
411    /// Caution: This should not return the `withdrawals` field
412    ///
413    /// Note:
414    /// > Provider software MAY stop the corresponding build process after serving this call.
415    pub async fn get_payload_v1(
416        &self,
417        payload_id: PayloadId,
418    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
419        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
420            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
421            EngineApiError::UnknownPayload
422        })
423    }
424
425    /// Metrics version of `get_payload_v1`
426    pub async fn get_payload_v1_metered(
427        &self,
428        payload_id: PayloadId,
429    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
430        let start = Instant::now();
431        let res = Self::get_payload_v1(self, payload_id).await;
432        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
433        res
434    }
435
436    /// Returns the most recent version of the payload that is available in the corresponding
437    /// payload build process at the time of receiving this call.
438    ///
439    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
440    ///
441    /// Note:
442    /// > Provider software MAY stop the corresponding build process after serving this call.
443    pub async fn get_payload_v2(
444        &self,
445        payload_id: PayloadId,
446    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
447        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
448    }
449
450    /// Metrics version of `get_payload_v2`
451    pub async fn get_payload_v2_metered(
452        &self,
453        payload_id: PayloadId,
454    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
455        let start = Instant::now();
456        let res = Self::get_payload_v2(self, payload_id).await;
457        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
458        res
459    }
460
461    /// Returns the most recent version of the payload that is available in the corresponding
462    /// payload build process at the time of receiving this call.
463    ///
464    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
465    ///
466    /// Note:
467    /// > Provider software MAY stop the corresponding build process after serving this call.
468    pub async fn get_payload_v3(
469        &self,
470        payload_id: PayloadId,
471    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
472        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
473    }
474
475    /// Metrics version of `get_payload_v3`
476    pub async fn get_payload_v3_metered(
477        &self,
478        payload_id: PayloadId,
479    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
480        let start = Instant::now();
481        let res = Self::get_payload_v3(self, payload_id).await;
482        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
483        res
484    }
485
486    /// Returns the most recent version of the payload that is available in the corresponding
487    /// payload build process at the time of receiving this call.
488    ///
489    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
490    ///
491    /// Note:
492    /// > Provider software MAY stop the corresponding build process after serving this call.
493    pub async fn get_payload_v4(
494        &self,
495        payload_id: PayloadId,
496    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
497        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
498    }
499
500    /// Metrics version of `get_payload_v4`
501    pub async fn get_payload_v4_metered(
502        &self,
503        payload_id: PayloadId,
504    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
505        let start = Instant::now();
506        let res = Self::get_payload_v4(self, payload_id).await;
507        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
508        res
509    }
510
511    /// Handler for `engine_getPayloadV5`
512    ///
513    /// Returns the most recent version of the payload that is available in the corresponding
514    /// payload build process at the time of receiving this call.
515    ///
516    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
517    ///
518    /// Note:
519    /// > Provider software MAY stop the corresponding build process after serving this call.
520    pub async fn get_payload_v5(
521        &self,
522        payload_id: PayloadId,
523    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
524        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
525    }
526
527    /// Metrics version of `get_payload_v5`
528    pub async fn get_payload_v5_metered(
529        &self,
530        payload_id: PayloadId,
531    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
532        let start = Instant::now();
533        let res = Self::get_payload_v5(self, payload_id).await;
534        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
535        res
536    }
537
538    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
539    /// blocks and returns the mapped payload bodies.
540    pub async fn get_payload_bodies_by_range_with<F, R>(
541        &self,
542        start: BlockNumber,
543        count: u64,
544        f: F,
545    ) -> EngineApiResult<Vec<Option<R>>>
546    where
547        F: Fn(Provider::Block) -> R + Send + 'static,
548        R: Send + 'static,
549    {
550        let (tx, rx) = oneshot::channel();
551        let inner = self.inner.clone();
552
553        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
554            if count > MAX_PAYLOAD_BODIES_LIMIT {
555                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
556                return;
557            }
558
559            if start == 0 || count == 0 {
560                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
561                return;
562            }
563
564            let mut result = Vec::with_capacity(count as usize);
565
566            // -1 so range is inclusive
567            let mut end = start.saturating_add(count - 1);
568
569            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
570            // truncate the end if it's greater than the last block
571            if let Ok(best_block) = inner.provider.best_block_number() {
572                if end > best_block {
573                    end = best_block;
574                }
575            }
576
577            for num in start..=end {
578                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
579                match block_result {
580                    Ok(block) => {
581                        result.push(block.map(&f));
582                    }
583                    Err(err) => {
584                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
585                        return;
586                    }
587                };
588            }
589            tx.send(Ok(result)).ok();
590        }));
591
592        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
593    }
594
595    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
596    /// blocks.
597    ///
598    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
599    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
600    /// adversarial.
601    ///
602    /// Implementers should take care when acting on the input to this method, specifically
603    /// ensuring that the range is limited properly, and that the range boundaries are computed
604    /// correctly and without panics.
605    pub async fn get_payload_bodies_by_range_v1(
606        &self,
607        start: BlockNumber,
608        count: u64,
609    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
610        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
611            transactions: block.body().encoded_2718_transactions(),
612            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
613        })
614        .await
615    }
616
617    /// Metrics version of `get_payload_bodies_by_range_v1`
618    pub async fn get_payload_bodies_by_range_v1_metered(
619        &self,
620        start: BlockNumber,
621        count: u64,
622    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
623        let start_time = Instant::now();
624        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
625        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
626        res
627    }
628
629    /// Called to retrieve execution payload bodies by hashes.
630    pub async fn get_payload_bodies_by_hash_with<F, R>(
631        &self,
632        hashes: Vec<BlockHash>,
633        f: F,
634    ) -> EngineApiResult<Vec<Option<R>>>
635    where
636        F: Fn(Provider::Block) -> R + Send + 'static,
637        R: Send + 'static,
638    {
639        let len = hashes.len() as u64;
640        if len > MAX_PAYLOAD_BODIES_LIMIT {
641            return Err(EngineApiError::PayloadRequestTooLarge { len });
642        }
643
644        let (tx, rx) = oneshot::channel();
645        let inner = self.inner.clone();
646
647        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
648            let mut result = Vec::with_capacity(hashes.len());
649            for hash in hashes {
650                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
651                match block_result {
652                    Ok(block) => {
653                        result.push(block.map(&f));
654                    }
655                    Err(err) => {
656                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
657                        return;
658                    }
659                }
660            }
661            tx.send(Ok(result)).ok();
662        }));
663
664        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
665    }
666
667    /// Called to retrieve execution payload bodies by hashes.
668    pub async fn get_payload_bodies_by_hash_v1(
669        &self,
670        hashes: Vec<BlockHash>,
671    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
672        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
673            transactions: block.body().encoded_2718_transactions(),
674            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
675        })
676        .await
677    }
678
679    /// Metrics version of `get_payload_bodies_by_hash_v1`
680    pub async fn get_payload_bodies_by_hash_v1_metered(
681        &self,
682        hashes: Vec<BlockHash>,
683    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
684        let start = Instant::now();
685        let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
686        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
687        res.await
688    }
689
690    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
691    /// update.
692    ///
693    /// The payload attributes will be validated according to the engine API rules for the given
694    /// message version:
695    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
696    ///   validated according to the Paris rules.
697    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
698    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
699    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
700    ///
701    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
702    ///   validated according to the Cancun rules.
703    async fn validate_and_execute_forkchoice(
704        &self,
705        version: EngineApiMessageVersion,
706        state: ForkchoiceState,
707        payload_attrs: Option<EngineT::PayloadAttributes>,
708    ) -> EngineApiResult<ForkchoiceUpdated> {
709        self.inner.record_elapsed_time_on_fcu();
710
711        if let Some(ref attrs) = payload_attrs {
712            let attr_validation_res =
713                self.inner.validator.ensure_well_formed_attributes(version, attrs);
714
715            // From the engine API spec:
716            //
717            // Client software MUST ensure that payloadAttributes.timestamp is greater than
718            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
719            // isn't held client software MUST respond with -38003: Invalid payload attributes and
720            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
721            // update MUST NOT be rolled back.
722            //
723            // NOTE: This will also apply to the validation result for the cancun or
724            // shanghai-specific fields provided in the payload attributes.
725            //
726            // To do this, we set the payload attrs to `None` if attribute validation failed, but
727            // we still apply the forkchoice update.
728            if let Err(err) = attr_validation_res {
729                let fcu_res =
730                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
731                // TODO: decide if we want this branch - the FCU INVALID response might be more
732                // useful than the payload attributes INVALID response
733                if fcu_res.is_invalid() {
734                    return Ok(fcu_res)
735                }
736                return Err(err.into())
737            }
738        }
739
740        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
741    }
742
743    /// Returns reference to supported capabilities.
744    pub fn capabilities(&self) -> &EngineCapabilities {
745        &self.inner.capabilities
746    }
747
748    fn get_blobs_v1(
749        &self,
750        versioned_hashes: Vec<B256>,
751    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
752        if versioned_hashes.len() > MAX_BLOB_LIMIT {
753            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
754        }
755
756        self.inner
757            .tx_pool
758            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
759            .map_err(|err| EngineApiError::Internal(Box::new(err)))
760    }
761
762    /// Metered version of `get_blobs_v1`.
763    pub fn get_blobs_v1_metered(
764        &self,
765        versioned_hashes: Vec<B256>,
766    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
767        let hashes_len = versioned_hashes.len();
768        let start = Instant::now();
769        let res = Self::get_blobs_v1(self, versioned_hashes);
770        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
771
772        if let Ok(blobs) = &res {
773            let blobs_found = blobs.iter().flatten().count();
774            let blobs_missed = hashes_len - blobs_found;
775
776            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
777            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
778        }
779
780        res
781    }
782
783    fn get_blobs_v2(
784        &self,
785        versioned_hashes: Vec<B256>,
786    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
787        if versioned_hashes.len() > MAX_BLOB_LIMIT {
788            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
789        }
790
791        self.inner
792            .tx_pool
793            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
794            .map_err(|err| EngineApiError::Internal(Box::new(err)))
795    }
796
797    /// Metered version of `get_blobs_v2`.
798    pub fn get_blobs_v2_metered(
799        &self,
800        versioned_hashes: Vec<B256>,
801    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
802        let hashes_len = versioned_hashes.len();
803        let start = Instant::now();
804        let res = Self::get_blobs_v2(self, versioned_hashes);
805        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
806
807        if let Ok(blobs) = &res {
808            let blobs_found = blobs.iter().flatten().count();
809
810            self.inner
811                .metrics
812                .blob_metrics
813                .get_blobs_requests_blobs_total
814                .increment(hashes_len as u64);
815            self.inner
816                .metrics
817                .blob_metrics
818                .get_blobs_requests_blobs_in_blobpool_total
819                .increment(blobs_found as u64);
820
821            if blobs_found == hashes_len {
822                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
823            } else {
824                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
825            }
826        } else {
827            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
828        }
829
830        res
831    }
832}
833
834// This is the concrete ethereum engine API implementation.
835#[async_trait]
836impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
837    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
838where
839    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
840    EngineT: EngineTypes<ExecutionData = ExecutionData>,
841    Pool: TransactionPool + 'static,
842    Validator: EngineValidator<EngineT>,
843    ChainSpec: EthereumHardforks + Send + Sync + 'static,
844{
845    /// Handler for `engine_newPayloadV1`
846    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
847    /// Caution: This should not accept the `withdrawals` field
848    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
849        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
850        let payload =
851            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
852        Ok(self.new_payload_v1_metered(payload).await?)
853    }
854
855    /// Handler for `engine_newPayloadV2`
856    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
857    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
858        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
859        let payload = ExecutionData {
860            payload: payload.into_payload(),
861            sidecar: ExecutionPayloadSidecar::none(),
862        };
863
864        Ok(self.new_payload_v2_metered(payload).await?)
865    }
866
867    /// Handler for `engine_newPayloadV3`
868    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
869    async fn new_payload_v3(
870        &self,
871        payload: ExecutionPayloadV3,
872        versioned_hashes: Vec<B256>,
873        parent_beacon_block_root: B256,
874    ) -> RpcResult<PayloadStatus> {
875        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
876        let payload = ExecutionData {
877            payload: payload.into(),
878            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
879                versioned_hashes,
880                parent_beacon_block_root,
881            }),
882        };
883
884        Ok(self.new_payload_v3_metered(payload).await?)
885    }
886
887    /// Handler for `engine_newPayloadV4`
888    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
889    async fn new_payload_v4(
890        &self,
891        payload: ExecutionPayloadV3,
892        versioned_hashes: Vec<B256>,
893        parent_beacon_block_root: B256,
894        requests: RequestsOrHash,
895    ) -> RpcResult<PayloadStatus> {
896        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
897
898        // Accept requests as a hash only if it is explicitly allowed
899        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
900            return Err(EngineApiError::UnexpectedRequestsHash.into());
901        }
902
903        let payload = ExecutionData {
904            payload: payload.into(),
905            sidecar: ExecutionPayloadSidecar::v4(
906                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
907                PraguePayloadFields { requests },
908            ),
909        };
910
911        Ok(self.new_payload_v4_metered(payload).await?)
912    }
913
914    /// Handler for `engine_forkchoiceUpdatedV1`
915    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
916    ///
917    /// Caution: This should not accept the `withdrawals` field
918    async fn fork_choice_updated_v1(
919        &self,
920        fork_choice_state: ForkchoiceState,
921        payload_attributes: Option<EngineT::PayloadAttributes>,
922    ) -> RpcResult<ForkchoiceUpdated> {
923        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
924        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
925    }
926
927    /// Handler for `engine_forkchoiceUpdatedV2`
928    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
929    async fn fork_choice_updated_v2(
930        &self,
931        fork_choice_state: ForkchoiceState,
932        payload_attributes: Option<EngineT::PayloadAttributes>,
933    ) -> RpcResult<ForkchoiceUpdated> {
934        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
935        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
936    }
937
938    /// Handler for `engine_forkchoiceUpdatedV2`
939    ///
940    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
941    async fn fork_choice_updated_v3(
942        &self,
943        fork_choice_state: ForkchoiceState,
944        payload_attributes: Option<EngineT::PayloadAttributes>,
945    ) -> RpcResult<ForkchoiceUpdated> {
946        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
947        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
948    }
949
950    /// Handler for `engine_getPayloadV1`
951    ///
952    /// Returns the most recent version of the payload that is available in the corresponding
953    /// payload build process at the time of receiving this call.
954    ///
955    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
956    ///
957    /// Caution: This should not return the `withdrawals` field
958    ///
959    /// Note:
960    /// > Provider software MAY stop the corresponding build process after serving this call.
961    async fn get_payload_v1(
962        &self,
963        payload_id: PayloadId,
964    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
965        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
966        Ok(self.get_payload_v1_metered(payload_id).await?)
967    }
968
969    /// Handler for `engine_getPayloadV2`
970    ///
971    /// Returns the most recent version of the payload that is available in the corresponding
972    /// payload build process at the time of receiving this call.
973    ///
974    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
975    ///
976    /// Note:
977    /// > Provider software MAY stop the corresponding build process after serving this call.
978    async fn get_payload_v2(
979        &self,
980        payload_id: PayloadId,
981    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
982        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
983        Ok(self.get_payload_v2_metered(payload_id).await?)
984    }
985
986    /// Handler for `engine_getPayloadV3`
987    ///
988    /// Returns the most recent version of the payload that is available in the corresponding
989    /// payload build process at the time of receiving this call.
990    ///
991    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
992    ///
993    /// Note:
994    /// > Provider software MAY stop the corresponding build process after serving this call.
995    async fn get_payload_v3(
996        &self,
997        payload_id: PayloadId,
998    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
999        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1000        Ok(self.get_payload_v3_metered(payload_id).await?)
1001    }
1002
1003    /// Handler for `engine_getPayloadV4`
1004    ///
1005    /// Returns the most recent version of the payload that is available in the corresponding
1006    /// payload build process at the time of receiving this call.
1007    ///
1008    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1009    ///
1010    /// Note:
1011    /// > Provider software MAY stop the corresponding build process after serving this call.
1012    async fn get_payload_v4(
1013        &self,
1014        payload_id: PayloadId,
1015    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1016        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1017        Ok(self.get_payload_v4_metered(payload_id).await?)
1018    }
1019
1020    /// Handler for `engine_getPayloadV5`
1021    ///
1022    /// Returns the most recent version of the payload that is available in the corresponding
1023    /// payload build process at the time of receiving this call.
1024    ///
1025    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1026    ///
1027    /// Note:
1028    /// > Provider software MAY stop the corresponding build process after serving this call.
1029    async fn get_payload_v5(
1030        &self,
1031        payload_id: PayloadId,
1032    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1033        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1034        Ok(self.get_payload_v5_metered(payload_id).await?)
1035    }
1036
1037    /// Handler for `engine_getPayloadBodiesByHashV1`
1038    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1039    async fn get_payload_bodies_by_hash_v1(
1040        &self,
1041        block_hashes: Vec<BlockHash>,
1042    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1043        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1044        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1045    }
1046
1047    /// Handler for `engine_getPayloadBodiesByRangeV1`
1048    ///
1049    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1050    ///
1051    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1052    /// blocks.
1053    ///
1054    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1055    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1056    /// adversarial.
1057    ///
1058    /// Implementers should take care when acting on the input to this method, specifically
1059    /// ensuring that the range is limited properly, and that the range boundaries are computed
1060    /// correctly and without panics.
1061    ///
1062    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1063    async fn get_payload_bodies_by_range_v1(
1064        &self,
1065        start: U64,
1066        count: U64,
1067    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1068        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1069        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1070    }
1071
1072    /// Handler for `engine_getClientVersionV1`
1073    ///
1074    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1075    async fn get_client_version_v1(
1076        &self,
1077        client: ClientVersionV1,
1078    ) -> RpcResult<Vec<ClientVersionV1>> {
1079        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1080        Ok(Self::get_client_version_v1(self, client)?)
1081    }
1082
1083    /// Handler for `engine_exchangeCapabilitiesV1`
1084    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1085    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1086        Ok(self.capabilities().list())
1087    }
1088
1089    async fn get_blobs_v1(
1090        &self,
1091        versioned_hashes: Vec<B256>,
1092    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1093        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1094        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1095    }
1096
1097    async fn get_blobs_v2(
1098        &self,
1099        versioned_hashes: Vec<B256>,
1100    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1101        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1102        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1103    }
1104}
1105
1106impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1107    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1108where
1109    EngineT: EngineTypes,
1110    Self: EngineApiServer<EngineT>,
1111{
1112    fn into_rpc_module(self) -> RpcModule<()> {
1113        self.into_rpc().remove_context()
1114    }
1115}
1116
1117impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1118    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1119where
1120    PayloadT: PayloadTypes,
1121{
1122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1123        f.debug_struct("EngineApi").finish_non_exhaustive()
1124    }
1125}
1126
1127impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1128    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1129where
1130    PayloadT: PayloadTypes,
1131{
1132    fn clone(&self) -> Self {
1133        Self { inner: Arc::clone(&self.inner) }
1134    }
1135}
1136
1137/// The container type for the engine API internals.
1138struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1139    /// The provider to interact with the chain.
1140    provider: Provider,
1141    /// Consensus configuration
1142    chain_spec: Arc<ChainSpec>,
1143    /// The channel to send messages to the beacon consensus engine.
1144    beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
1145    /// The type that can communicate with the payload service to retrieve payloads.
1146    payload_store: PayloadStore<PayloadT>,
1147    /// For spawning and executing async tasks
1148    task_spawner: Box<dyn TaskSpawner>,
1149    /// The latency and response type metrics for engine api calls
1150    metrics: EngineApiMetrics,
1151    /// Identification of the execution client used by the consensus client
1152    client: ClientVersionV1,
1153    /// The list of all supported Engine capabilities available over the engine endpoint.
1154    capabilities: EngineCapabilities,
1155    /// Transaction pool.
1156    tx_pool: Pool,
1157    /// Engine validator.
1158    validator: Validator,
1159    /// Start time of the latest payload request
1160    latest_new_payload_response: Mutex<Option<Instant>>,
1161    accept_execution_requests_hash: bool,
1162}
1163
1164impl<Provider, PayloadT, Pool, Validator, ChainSpec>
1165    EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
1166where
1167    PayloadT: PayloadTypes,
1168{
1169    /// Tracks the elapsed time between the new payload response and the received forkchoice update
1170    /// request.
1171    fn record_elapsed_time_on_fcu(&self) {
1172        if let Some(start_time) = self.latest_new_payload_response.lock().take() {
1173            let elapsed_time = start_time.elapsed();
1174            self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
1175        }
1176    }
1177
1178    /// Updates the timestamp for the latest new payload response.
1179    fn on_new_payload_response(&self) {
1180        self.latest_new_payload_response.lock().replace(Instant::now());
1181    }
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186    use super::*;
1187    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1188    use assert_matches::assert_matches;
1189    use reth_chainspec::{ChainSpec, MAINNET};
1190    use reth_engine_primitives::BeaconEngineMessage;
1191    use reth_ethereum_engine_primitives::EthEngineTypes;
1192    use reth_ethereum_primitives::Block;
1193    use reth_node_ethereum::EthereumEngineValidator;
1194    use reth_payload_builder::test_utils::spawn_test_payload_service;
1195    use reth_provider::test_utils::MockEthProvider;
1196    use reth_tasks::TokioTaskExecutor;
1197    use reth_transaction_pool::noop::NoopTransactionPool;
1198    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1199
1200    fn setup_engine_api() -> (
1201        EngineApiTestHandle,
1202        EngineApi<
1203            Arc<MockEthProvider>,
1204            EthEngineTypes,
1205            NoopTransactionPool,
1206            EthereumEngineValidator,
1207            ChainSpec,
1208        >,
1209    ) {
1210        let client = ClientVersionV1 {
1211            code: ClientCode::RH,
1212            name: "Reth".to_string(),
1213            version: "v0.2.0-beta.5".to_string(),
1214            commit: "defa64b2".to_string(),
1215        };
1216
1217        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1218        let provider = Arc::new(MockEthProvider::default());
1219        let payload_store = spawn_test_payload_service();
1220        let (to_engine, engine_rx) = unbounded_channel();
1221        let task_executor = Box::<TokioTaskExecutor>::default();
1222        let api = EngineApi::new(
1223            provider.clone(),
1224            chain_spec.clone(),
1225            BeaconConsensusEngineHandle::new(to_engine),
1226            payload_store.into(),
1227            NoopTransactionPool::default(),
1228            task_executor,
1229            client,
1230            EngineCapabilities::default(),
1231            EthereumEngineValidator::new(chain_spec.clone()),
1232            false,
1233        );
1234        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1235        (handle, api)
1236    }
1237
1238    #[tokio::test]
1239    async fn engine_client_version_v1() {
1240        let client = ClientVersionV1 {
1241            code: ClientCode::RH,
1242            name: "Reth".to_string(),
1243            version: "v0.2.0-beta.5".to_string(),
1244            commit: "defa64b2".to_string(),
1245        };
1246        let (_, api) = setup_engine_api();
1247        let res = api.get_client_version_v1(client.clone());
1248        assert_eq!(res.unwrap(), vec![client]);
1249    }
1250
1251    struct EngineApiTestHandle {
1252        #[allow(dead_code)]
1253        chain_spec: Arc<ChainSpec>,
1254        provider: Arc<MockEthProvider>,
1255        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1256    }
1257
1258    #[tokio::test]
1259    async fn forwards_responses_to_consensus_engine() {
1260        let (mut handle, api) = setup_engine_api();
1261
1262        tokio::spawn(async move {
1263            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1264            let execution_data = ExecutionData {
1265                payload: payload_v1.into(),
1266                sidecar: ExecutionPayloadSidecar::none(),
1267            };
1268
1269            api.new_payload_v1(execution_data).await.unwrap();
1270        });
1271        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1272    }
1273
1274    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1275    mod get_payload_bodies {
1276        use super::*;
1277        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1278        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1279
1280        #[tokio::test]
1281        async fn invalid_params() {
1282            let (_, api) = setup_engine_api();
1283
1284            let by_range_tests = [
1285                // (start, count)
1286                (0, 0),
1287                (0, 1),
1288                (1, 0),
1289            ];
1290
1291            // test [EngineApiMessage::GetPayloadBodiesByRange]
1292            for (start, count) in by_range_tests {
1293                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1294                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1295            }
1296        }
1297
1298        #[tokio::test]
1299        async fn request_too_large() {
1300            let (_, api) = setup_engine_api();
1301
1302            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1303            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1304            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1305        }
1306
1307        #[tokio::test]
1308        async fn returns_payload_bodies() {
1309            let mut rng = generators::rng();
1310            let (handle, api) = setup_engine_api();
1311
1312            let (start, count) = (1, 10);
1313            let blocks = random_block_range(
1314                &mut rng,
1315                start..=start + count - 1,
1316                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1317            );
1318            handle
1319                .provider
1320                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1321
1322            let expected = blocks
1323                .iter()
1324                .cloned()
1325                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1326                .collect::<Vec<_>>();
1327
1328            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1329            assert_eq!(res, expected);
1330        }
1331
1332        #[tokio::test]
1333        async fn returns_payload_bodies_with_gaps() {
1334            let mut rng = generators::rng();
1335            let (handle, api) = setup_engine_api();
1336
1337            let (start, count) = (1, 100);
1338            let blocks = random_block_range(
1339                &mut rng,
1340                start..=start + count - 1,
1341                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1342            );
1343
1344            // Insert only blocks in ranges 1-25 and 50-75
1345            let first_missing_range = 26..=50;
1346            let second_missing_range = 76..=100;
1347            handle.provider.extend_blocks(
1348                blocks
1349                    .iter()
1350                    .filter(|b| {
1351                        !first_missing_range.contains(&b.number) &&
1352                            !second_missing_range.contains(&b.number)
1353                    })
1354                    .map(|b| (b.hash(), b.clone().into_block())),
1355            );
1356
1357            let expected = blocks
1358                .iter()
1359                // filter anything after the second missing range to ensure we don't expect trailing
1360                // `None`s
1361                .filter(|b| !second_missing_range.contains(&b.number))
1362                .cloned()
1363                .map(|b| {
1364                    if first_missing_range.contains(&b.number) {
1365                        None
1366                    } else {
1367                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1368                    }
1369                })
1370                .collect::<Vec<_>>();
1371
1372            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1373            assert_eq!(res, expected);
1374
1375            let expected = blocks
1376                .iter()
1377                .cloned()
1378                // ensure we still return trailing `None`s here because by-hash will not be aware
1379                // of the missing block's number, and cannot compare it to the current best block
1380                .map(|b| {
1381                    if first_missing_range.contains(&b.number) ||
1382                        second_missing_range.contains(&b.number)
1383                    {
1384                        None
1385                    } else {
1386                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1387                    }
1388                })
1389                .collect::<Vec<_>>();
1390
1391            let hashes = blocks.iter().map(|b| b.hash()).collect();
1392            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1393            assert_eq!(res, expected);
1394        }
1395    }
1396}