reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24    validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload, PayloadOrAttributes,
25    PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{
33    sync::Arc,
34    time::{Instant, SystemTime},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace, warn};
38
39/// The Engine API response sender.
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
41
42/// The upper limit for payload bodies request.
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
44
45/// The upper limit for blobs in `engine_getBlobsVx`.
46const MAX_BLOB_LIMIT: usize = 128;
47
48/// The Engine API implementation that grants the Consensus layer access to data and
49/// functions in the Execution layer that are crucial for the consensus process.
50///
51/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
52/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
53/// are still follow the engine API model.
54///
55/// ## Implementers
56///
57/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
58/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
59/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
60/// endpoints (e.g. opstack).
61/// See also [`EngineApiServer`] implementation for this type which is the
62/// L1 implementation.
63pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
64    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
65}
66
67impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
68    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
69{
70    /// Returns the configured chainspec.
71    pub fn chain_spec(&self) -> &Arc<ChainSpec> {
72        &self.inner.chain_spec
73    }
74}
75
76impl<Provider, PayloadT, Pool, Validator, ChainSpec>
77    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
78where
79    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
80    PayloadT: PayloadTypes,
81    Pool: TransactionPool + 'static,
82    Validator: EngineApiValidator<PayloadT>,
83    ChainSpec: EthereumHardforks + Send + Sync + 'static,
84{
85    /// Create new instance of [`EngineApi`].
86    #[expect(clippy::too_many_arguments)]
87    pub fn new(
88        provider: Provider,
89        chain_spec: Arc<ChainSpec>,
90        beacon_consensus: ConsensusEngineHandle<PayloadT>,
91        payload_store: PayloadStore<PayloadT>,
92        tx_pool: Pool,
93        task_spawner: Box<dyn TaskSpawner>,
94        client: ClientVersionV1,
95        capabilities: EngineCapabilities,
96        validator: Validator,
97        accept_execution_requests_hash: bool,
98    ) -> Self {
99        let inner = Arc::new(EngineApiInner {
100            provider,
101            chain_spec,
102            beacon_consensus,
103            payload_store,
104            task_spawner,
105            metrics: EngineApiMetrics::default(),
106            client,
107            capabilities,
108            tx_pool,
109            validator,
110            latest_new_payload_response: Mutex::new(None),
111            accept_execution_requests_hash,
112        });
113        Self { inner }
114    }
115
116    /// Fetches the client version.
117    pub fn get_client_version_v1(
118        &self,
119        _client: ClientVersionV1,
120    ) -> EngineApiResult<Vec<ClientVersionV1>> {
121        Ok(vec![self.inner.client.clone()])
122    }
123
124    /// Fetches the timestamp of the payload with the given id.
125    async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
126        Ok(self
127            .inner
128            .payload_store
129            .payload_timestamp(payload_id)
130            .await
131            .ok_or(EngineApiError::UnknownPayload)??)
132    }
133
134    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
135    /// Caution: This should not accept the `withdrawals` field
136    pub async fn new_payload_v1(
137        &self,
138        payload: PayloadT::ExecutionData,
139    ) -> EngineApiResult<PayloadStatus> {
140        let payload_or_attrs = PayloadOrAttributes::<
141            '_,
142            PayloadT::ExecutionData,
143            PayloadT::PayloadAttributes,
144        >::from_execution_payload(&payload);
145
146        self.inner
147            .validator
148            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
149
150        Ok(self
151            .inner
152            .beacon_consensus
153            .new_payload(payload)
154            .await
155            .inspect(|_| self.inner.on_new_payload_response())?)
156    }
157
158    /// Metered version of `new_payload_v1`.
159    pub async fn new_payload_v1_metered(
160        &self,
161        payload: PayloadT::ExecutionData,
162    ) -> EngineApiResult<PayloadStatus> {
163        let start = Instant::now();
164        let gas_used = payload.gas_used();
165
166        let res = Self::new_payload_v1(self, payload).await;
167        let elapsed = start.elapsed();
168        self.inner.metrics.latency.new_payload_v1.record(elapsed);
169        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
170        res
171    }
172
173    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
174    pub async fn new_payload_v2(
175        &self,
176        payload: PayloadT::ExecutionData,
177    ) -> EngineApiResult<PayloadStatus> {
178        let payload_or_attrs = PayloadOrAttributes::<
179            '_,
180            PayloadT::ExecutionData,
181            PayloadT::PayloadAttributes,
182        >::from_execution_payload(&payload);
183        self.inner
184            .validator
185            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
186        Ok(self
187            .inner
188            .beacon_consensus
189            .new_payload(payload)
190            .await
191            .inspect(|_| self.inner.on_new_payload_response())?)
192    }
193
194    /// Metered version of `new_payload_v2`.
195    pub async fn new_payload_v2_metered(
196        &self,
197        payload: PayloadT::ExecutionData,
198    ) -> EngineApiResult<PayloadStatus> {
199        let start = Instant::now();
200        let gas_used = payload.gas_used();
201
202        let res = Self::new_payload_v2(self, payload).await;
203        let elapsed = start.elapsed();
204        self.inner.metrics.latency.new_payload_v2.record(elapsed);
205        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
206        res
207    }
208
209    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
210    pub async fn new_payload_v3(
211        &self,
212        payload: PayloadT::ExecutionData,
213    ) -> EngineApiResult<PayloadStatus> {
214        let payload_or_attrs = PayloadOrAttributes::<
215            '_,
216            PayloadT::ExecutionData,
217            PayloadT::PayloadAttributes,
218        >::from_execution_payload(&payload);
219        self.inner
220            .validator
221            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
222
223        Ok(self
224            .inner
225            .beacon_consensus
226            .new_payload(payload)
227            .await
228            .inspect(|_| self.inner.on_new_payload_response())?)
229    }
230
231    /// Metrics version of `new_payload_v3`
232    pub async fn new_payload_v3_metered(
233        &self,
234        payload: PayloadT::ExecutionData,
235    ) -> RpcResult<PayloadStatus> {
236        let start = Instant::now();
237        let gas_used = payload.gas_used();
238
239        let res = Self::new_payload_v3(self, payload).await;
240        let elapsed = start.elapsed();
241        self.inner.metrics.latency.new_payload_v3.record(elapsed);
242        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
243        Ok(res?)
244    }
245
246    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
247    pub async fn new_payload_v4(
248        &self,
249        payload: PayloadT::ExecutionData,
250    ) -> EngineApiResult<PayloadStatus> {
251        let payload_or_attrs = PayloadOrAttributes::<
252            '_,
253            PayloadT::ExecutionData,
254            PayloadT::PayloadAttributes,
255        >::from_execution_payload(&payload);
256        self.inner
257            .validator
258            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
259
260        Ok(self
261            .inner
262            .beacon_consensus
263            .new_payload(payload)
264            .await
265            .inspect(|_| self.inner.on_new_payload_response())?)
266    }
267
268    /// Metrics version of `new_payload_v4`
269    pub async fn new_payload_v4_metered(
270        &self,
271        payload: PayloadT::ExecutionData,
272    ) -> RpcResult<PayloadStatus> {
273        let start = Instant::now();
274        let gas_used = payload.gas_used();
275
276        let res = Self::new_payload_v4(self, payload).await;
277
278        let elapsed = start.elapsed();
279        self.inner.metrics.latency.new_payload_v4.record(elapsed);
280        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
281        Ok(res?)
282    }
283
284    /// Returns whether the engine accepts execution requests hash.
285    pub fn accept_execution_requests_hash(&self) -> bool {
286        self.inner.accept_execution_requests_hash
287    }
288}
289
290impl<Provider, EngineT, Pool, Validator, ChainSpec>
291    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
292where
293    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
294    EngineT: EngineTypes,
295    Pool: TransactionPool + 'static,
296    Validator: EngineApiValidator<EngineT>,
297    ChainSpec: EthereumHardforks + Send + Sync + 'static,
298{
299    /// Sends a message to the beacon consensus engine to update the fork choice _without_
300    /// withdrawals.
301    ///
302    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
303    ///
304    /// Caution: This should not accept the `withdrawals` field
305    pub async fn fork_choice_updated_v1(
306        &self,
307        state: ForkchoiceState,
308        payload_attrs: Option<EngineT::PayloadAttributes>,
309    ) -> EngineApiResult<ForkchoiceUpdated> {
310        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
311            .await
312    }
313
314    /// Metrics version of `fork_choice_updated_v1`
315    pub async fn fork_choice_updated_v1_metered(
316        &self,
317        state: ForkchoiceState,
318        payload_attrs: Option<EngineT::PayloadAttributes>,
319    ) -> EngineApiResult<ForkchoiceUpdated> {
320        let start = Instant::now();
321        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
322        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
323        self.inner.metrics.fcu_response.update_response_metrics(&res);
324        res
325    }
326
327    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
328    /// but only _after_ shanghai.
329    ///
330    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
331    pub async fn fork_choice_updated_v2(
332        &self,
333        state: ForkchoiceState,
334        payload_attrs: Option<EngineT::PayloadAttributes>,
335    ) -> EngineApiResult<ForkchoiceUpdated> {
336        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
337            .await
338    }
339
340    /// Metrics version of `fork_choice_updated_v2`
341    pub async fn fork_choice_updated_v2_metered(
342        &self,
343        state: ForkchoiceState,
344        payload_attrs: Option<EngineT::PayloadAttributes>,
345    ) -> EngineApiResult<ForkchoiceUpdated> {
346        let start = Instant::now();
347        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
348        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
349        self.inner.metrics.fcu_response.update_response_metrics(&res);
350        res
351    }
352
353    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
354    /// but only _after_ cancun.
355    ///
356    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
357    pub async fn fork_choice_updated_v3(
358        &self,
359        state: ForkchoiceState,
360        payload_attrs: Option<EngineT::PayloadAttributes>,
361    ) -> EngineApiResult<ForkchoiceUpdated> {
362        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
363            .await
364    }
365
366    /// Metrics version of `fork_choice_updated_v3`
367    pub async fn fork_choice_updated_v3_metered(
368        &self,
369        state: ForkchoiceState,
370        payload_attrs: Option<EngineT::PayloadAttributes>,
371    ) -> EngineApiResult<ForkchoiceUpdated> {
372        let start = Instant::now();
373        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
374        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
375        self.inner.metrics.fcu_response.update_response_metrics(&res);
376        res
377    }
378
379    /// Helper function for retrieving the build payload by id.
380    async fn get_built_payload(
381        &self,
382        payload_id: PayloadId,
383    ) -> EngineApiResult<EngineT::BuiltPayload> {
384        self.inner
385            .payload_store
386            .resolve(payload_id)
387            .await
388            .ok_or(EngineApiError::UnknownPayload)?
389            .map_err(|_| EngineApiError::UnknownPayload)
390    }
391
392    /// Helper function for validating the payload timestamp and retrieving & converting the payload
393    /// into desired envelope.
394    async fn get_payload_inner<R>(
395        &self,
396        payload_id: PayloadId,
397        version: EngineApiMessageVersion,
398    ) -> EngineApiResult<R>
399    where
400        EngineT::BuiltPayload: TryInto<R>,
401    {
402        // validate timestamp according to engine rules
403        let timestamp = self.get_payload_timestamp(payload_id).await?;
404        validate_payload_timestamp(&self.inner.chain_spec, version, timestamp)?;
405
406        // Now resolve the payload
407        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
408            warn!(?version, "could not transform built payload");
409            EngineApiError::UnknownPayload
410        })
411    }
412
413    /// Returns the most recent version of the payload that is available in the corresponding
414    /// payload build process at the time of receiving this call.
415    ///
416    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
417    ///
418    /// Caution: This should not return the `withdrawals` field
419    ///
420    /// Note:
421    /// > Provider software MAY stop the corresponding build process after serving this call.
422    pub async fn get_payload_v1(
423        &self,
424        payload_id: PayloadId,
425    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
426        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
427            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
428            EngineApiError::UnknownPayload
429        })
430    }
431
432    /// Metrics version of `get_payload_v1`
433    pub async fn get_payload_v1_metered(
434        &self,
435        payload_id: PayloadId,
436    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
437        let start = Instant::now();
438        let res = Self::get_payload_v1(self, payload_id).await;
439        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
440        res
441    }
442
443    /// Returns the most recent version of the payload that is available in the corresponding
444    /// payload build process at the time of receiving this call.
445    ///
446    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
447    ///
448    /// Note:
449    /// > Provider software MAY stop the corresponding build process after serving this call.
450    pub async fn get_payload_v2(
451        &self,
452        payload_id: PayloadId,
453    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
454        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
455    }
456
457    /// Metrics version of `get_payload_v2`
458    pub async fn get_payload_v2_metered(
459        &self,
460        payload_id: PayloadId,
461    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
462        let start = Instant::now();
463        let res = Self::get_payload_v2(self, payload_id).await;
464        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
465        res
466    }
467
468    /// Returns the most recent version of the payload that is available in the corresponding
469    /// payload build process at the time of receiving this call.
470    ///
471    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
472    ///
473    /// Note:
474    /// > Provider software MAY stop the corresponding build process after serving this call.
475    pub async fn get_payload_v3(
476        &self,
477        payload_id: PayloadId,
478    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
479        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
480    }
481
482    /// Metrics version of `get_payload_v3`
483    pub async fn get_payload_v3_metered(
484        &self,
485        payload_id: PayloadId,
486    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
487        let start = Instant::now();
488        let res = Self::get_payload_v3(self, payload_id).await;
489        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
490        res
491    }
492
493    /// Returns the most recent version of the payload that is available in the corresponding
494    /// payload build process at the time of receiving this call.
495    ///
496    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
497    ///
498    /// Note:
499    /// > Provider software MAY stop the corresponding build process after serving this call.
500    pub async fn get_payload_v4(
501        &self,
502        payload_id: PayloadId,
503    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
504        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
505    }
506
507    /// Metrics version of `get_payload_v4`
508    pub async fn get_payload_v4_metered(
509        &self,
510        payload_id: PayloadId,
511    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
512        let start = Instant::now();
513        let res = Self::get_payload_v4(self, payload_id).await;
514        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
515        res
516    }
517
518    /// Handler for `engine_getPayloadV5`
519    ///
520    /// Returns the most recent version of the payload that is available in the corresponding
521    /// payload build process at the time of receiving this call.
522    ///
523    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
524    ///
525    /// Note:
526    /// > Provider software MAY stop the corresponding build process after serving this call.
527    pub async fn get_payload_v5(
528        &self,
529        payload_id: PayloadId,
530    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
531        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
532    }
533
534    /// Metrics version of `get_payload_v5`
535    pub async fn get_payload_v5_metered(
536        &self,
537        payload_id: PayloadId,
538    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
539        let start = Instant::now();
540        let res = Self::get_payload_v5(self, payload_id).await;
541        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
542        res
543    }
544
545    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
546    /// blocks and returns the mapped payload bodies.
547    pub async fn get_payload_bodies_by_range_with<F, R>(
548        &self,
549        start: BlockNumber,
550        count: u64,
551        f: F,
552    ) -> EngineApiResult<Vec<Option<R>>>
553    where
554        F: Fn(Provider::Block) -> R + Send + 'static,
555        R: Send + 'static,
556    {
557        let (tx, rx) = oneshot::channel();
558        let inner = self.inner.clone();
559
560        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
561            if count > MAX_PAYLOAD_BODIES_LIMIT {
562                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
563                return;
564            }
565
566            if start == 0 || count == 0 {
567                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
568                return;
569            }
570
571            let mut result = Vec::with_capacity(count as usize);
572
573            // -1 so range is inclusive
574            let mut end = start.saturating_add(count - 1);
575
576            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
577            // truncate the end if it's greater than the last block
578            if let Ok(best_block) = inner.provider.best_block_number()
579                && end > best_block {
580                    end = best_block;
581                }
582
583            // Check if the requested range starts before the earliest available block due to pruning/expiry
584            let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
585            for num in start..=end {
586                if num < earliest_block {
587                    result.push(None);
588                    continue;
589                }
590                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
591                match block_result {
592                    Ok(block) => {
593                        result.push(block.map(&f));
594                    }
595                    Err(err) => {
596                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
597                        return;
598                    }
599                };
600            }
601            tx.send(Ok(result)).ok();
602        }));
603
604        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
605    }
606
607    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
608    /// blocks.
609    ///
610    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
611    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
612    /// adversarial.
613    ///
614    /// Implementers should take care when acting on the input to this method, specifically
615    /// ensuring that the range is limited properly, and that the range boundaries are computed
616    /// correctly and without panics.
617    pub async fn get_payload_bodies_by_range_v1(
618        &self,
619        start: BlockNumber,
620        count: u64,
621    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
622        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
623            transactions: block.body().encoded_2718_transactions(),
624            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
625        })
626        .await
627    }
628
629    /// Metrics version of `get_payload_bodies_by_range_v1`
630    pub async fn get_payload_bodies_by_range_v1_metered(
631        &self,
632        start: BlockNumber,
633        count: u64,
634    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
635        let start_time = Instant::now();
636        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
637        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
638        res
639    }
640
641    /// Called to retrieve execution payload bodies by hashes.
642    pub async fn get_payload_bodies_by_hash_with<F, R>(
643        &self,
644        hashes: Vec<BlockHash>,
645        f: F,
646    ) -> EngineApiResult<Vec<Option<R>>>
647    where
648        F: Fn(Provider::Block) -> R + Send + 'static,
649        R: Send + 'static,
650    {
651        let len = hashes.len() as u64;
652        if len > MAX_PAYLOAD_BODIES_LIMIT {
653            return Err(EngineApiError::PayloadRequestTooLarge { len });
654        }
655
656        let (tx, rx) = oneshot::channel();
657        let inner = self.inner.clone();
658
659        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
660            let mut result = Vec::with_capacity(hashes.len());
661            for hash in hashes {
662                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
663                match block_result {
664                    Ok(block) => {
665                        result.push(block.map(&f));
666                    }
667                    Err(err) => {
668                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
669                        return;
670                    }
671                }
672            }
673            tx.send(Ok(result)).ok();
674        }));
675
676        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
677    }
678
679    /// Called to retrieve execution payload bodies by hashes.
680    pub async fn get_payload_bodies_by_hash_v1(
681        &self,
682        hashes: Vec<BlockHash>,
683    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
684        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
685            transactions: block.body().encoded_2718_transactions(),
686            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
687        })
688        .await
689    }
690
691    /// Metrics version of `get_payload_bodies_by_hash_v1`
692    pub async fn get_payload_bodies_by_hash_v1_metered(
693        &self,
694        hashes: Vec<BlockHash>,
695    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
696        let start = Instant::now();
697        let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
698        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
699        res.await
700    }
701
702    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
703    /// update.
704    ///
705    /// The payload attributes will be validated according to the engine API rules for the given
706    /// message version:
707    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
708    ///   validated according to the Paris rules.
709    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
710    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
711    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
712    ///
713    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
714    ///   validated according to the Cancun rules.
715    async fn validate_and_execute_forkchoice(
716        &self,
717        version: EngineApiMessageVersion,
718        state: ForkchoiceState,
719        payload_attrs: Option<EngineT::PayloadAttributes>,
720    ) -> EngineApiResult<ForkchoiceUpdated> {
721        self.inner.record_elapsed_time_on_fcu();
722
723        if let Some(ref attrs) = payload_attrs {
724            let attr_validation_res =
725                self.inner.validator.ensure_well_formed_attributes(version, attrs);
726
727            // From the engine API spec:
728            //
729            // Client software MUST ensure that payloadAttributes.timestamp is greater than
730            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
731            // isn't held client software MUST respond with -38003: Invalid payload attributes and
732            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
733            // update MUST NOT be rolled back.
734            //
735            // NOTE: This will also apply to the validation result for the cancun or
736            // shanghai-specific fields provided in the payload attributes.
737            //
738            // To do this, we set the payload attrs to `None` if attribute validation failed, but
739            // we still apply the forkchoice update.
740            if let Err(err) = attr_validation_res {
741                let fcu_res =
742                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
743                // TODO: decide if we want this branch - the FCU INVALID response might be more
744                // useful than the payload attributes INVALID response
745                if fcu_res.is_invalid() {
746                    return Ok(fcu_res)
747                }
748                return Err(err.into())
749            }
750        }
751
752        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
753    }
754
755    /// Returns reference to supported capabilities.
756    pub fn capabilities(&self) -> &EngineCapabilities {
757        &self.inner.capabilities
758    }
759
760    fn get_blobs_v1(
761        &self,
762        versioned_hashes: Vec<B256>,
763    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
764        // Only allow this method before Osaka fork
765        let current_timestamp =
766            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
767        if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
768            return Err(EngineApiError::EngineObjectValidationError(
769                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
770            ));
771        }
772
773        if versioned_hashes.len() > MAX_BLOB_LIMIT {
774            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
775        }
776
777        self.inner
778            .tx_pool
779            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
780            .map_err(|err| EngineApiError::Internal(Box::new(err)))
781    }
782
783    /// Metered version of `get_blobs_v1`.
784    pub fn get_blobs_v1_metered(
785        &self,
786        versioned_hashes: Vec<B256>,
787    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
788        let hashes_len = versioned_hashes.len();
789        let start = Instant::now();
790        let res = Self::get_blobs_v1(self, versioned_hashes);
791        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
792
793        if let Ok(blobs) = &res {
794            let blobs_found = blobs.iter().flatten().count();
795            let blobs_missed = hashes_len - blobs_found;
796
797            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
798            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
799        }
800
801        res
802    }
803
804    fn get_blobs_v2(
805        &self,
806        versioned_hashes: Vec<B256>,
807    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
808        // Check if Osaka fork is active
809        let current_timestamp =
810            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
811        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
812            return Err(EngineApiError::EngineObjectValidationError(
813                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
814            ));
815        }
816
817        if versioned_hashes.len() > MAX_BLOB_LIMIT {
818            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
819        }
820
821        self.inner
822            .tx_pool
823            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
824            .map_err(|err| EngineApiError::Internal(Box::new(err)))
825    }
826
827    /// Metered version of `get_blobs_v2`.
828    pub fn get_blobs_v2_metered(
829        &self,
830        versioned_hashes: Vec<B256>,
831    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
832        let hashes_len = versioned_hashes.len();
833        let start = Instant::now();
834        let res = Self::get_blobs_v2(self, versioned_hashes);
835        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
836
837        if let Ok(blobs) = &res {
838            let blobs_found = blobs.iter().flatten().count();
839
840            self.inner
841                .metrics
842                .blob_metrics
843                .get_blobs_requests_blobs_total
844                .increment(hashes_len as u64);
845            self.inner
846                .metrics
847                .blob_metrics
848                .get_blobs_requests_blobs_in_blobpool_total
849                .increment(blobs_found as u64);
850
851            if blobs_found == hashes_len {
852                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
853            } else {
854                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
855            }
856        } else {
857            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
858        }
859
860        res
861    }
862}
863
864// This is the concrete ethereum engine API implementation.
865#[async_trait]
866impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
867    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
868where
869    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
870    EngineT: EngineTypes<ExecutionData = ExecutionData>,
871    Pool: TransactionPool + 'static,
872    Validator: EngineApiValidator<EngineT>,
873    ChainSpec: EthereumHardforks + Send + Sync + 'static,
874{
875    /// Handler for `engine_newPayloadV1`
876    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
877    /// Caution: This should not accept the `withdrawals` field
878    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
879        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
880        let payload =
881            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
882        Ok(self.new_payload_v1_metered(payload).await?)
883    }
884
885    /// Handler for `engine_newPayloadV2`
886    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
887    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
888        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
889        let payload = ExecutionData {
890            payload: payload.into_payload(),
891            sidecar: ExecutionPayloadSidecar::none(),
892        };
893
894        Ok(self.new_payload_v2_metered(payload).await?)
895    }
896
897    /// Handler for `engine_newPayloadV3`
898    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
899    async fn new_payload_v3(
900        &self,
901        payload: ExecutionPayloadV3,
902        versioned_hashes: Vec<B256>,
903        parent_beacon_block_root: B256,
904    ) -> RpcResult<PayloadStatus> {
905        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
906        let payload = ExecutionData {
907            payload: payload.into(),
908            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
909                versioned_hashes,
910                parent_beacon_block_root,
911            }),
912        };
913
914        Ok(self.new_payload_v3_metered(payload).await?)
915    }
916
917    /// Handler for `engine_newPayloadV4`
918    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
919    async fn new_payload_v4(
920        &self,
921        payload: ExecutionPayloadV3,
922        versioned_hashes: Vec<B256>,
923        parent_beacon_block_root: B256,
924        requests: RequestsOrHash,
925    ) -> RpcResult<PayloadStatus> {
926        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
927
928        // Accept requests as a hash only if it is explicitly allowed
929        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
930            return Err(EngineApiError::UnexpectedRequestsHash.into());
931        }
932
933        let payload = ExecutionData {
934            payload: payload.into(),
935            sidecar: ExecutionPayloadSidecar::v4(
936                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
937                PraguePayloadFields { requests },
938            ),
939        };
940
941        Ok(self.new_payload_v4_metered(payload).await?)
942    }
943
944    /// Handler for `engine_forkchoiceUpdatedV1`
945    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
946    ///
947    /// Caution: This should not accept the `withdrawals` field
948    async fn fork_choice_updated_v1(
949        &self,
950        fork_choice_state: ForkchoiceState,
951        payload_attributes: Option<EngineT::PayloadAttributes>,
952    ) -> RpcResult<ForkchoiceUpdated> {
953        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
954        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
955    }
956
957    /// Handler for `engine_forkchoiceUpdatedV2`
958    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
959    async fn fork_choice_updated_v2(
960        &self,
961        fork_choice_state: ForkchoiceState,
962        payload_attributes: Option<EngineT::PayloadAttributes>,
963    ) -> RpcResult<ForkchoiceUpdated> {
964        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
965        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
966    }
967
968    /// Handler for `engine_forkchoiceUpdatedV2`
969    ///
970    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
971    async fn fork_choice_updated_v3(
972        &self,
973        fork_choice_state: ForkchoiceState,
974        payload_attributes: Option<EngineT::PayloadAttributes>,
975    ) -> RpcResult<ForkchoiceUpdated> {
976        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
977        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
978    }
979
980    /// Handler for `engine_getPayloadV1`
981    ///
982    /// Returns the most recent version of the payload that is available in the corresponding
983    /// payload build process at the time of receiving this call.
984    ///
985    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
986    ///
987    /// Caution: This should not return the `withdrawals` field
988    ///
989    /// Note:
990    /// > Provider software MAY stop the corresponding build process after serving this call.
991    async fn get_payload_v1(
992        &self,
993        payload_id: PayloadId,
994    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
995        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
996        Ok(self.get_payload_v1_metered(payload_id).await?)
997    }
998
999    /// Handler for `engine_getPayloadV2`
1000    ///
1001    /// Returns the most recent version of the payload that is available in the corresponding
1002    /// payload build process at the time of receiving this call.
1003    ///
1004    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
1005    ///
1006    /// Note:
1007    /// > Provider software MAY stop the corresponding build process after serving this call.
1008    async fn get_payload_v2(
1009        &self,
1010        payload_id: PayloadId,
1011    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1012        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1013        Ok(self.get_payload_v2_metered(payload_id).await?)
1014    }
1015
1016    /// Handler for `engine_getPayloadV3`
1017    ///
1018    /// Returns the most recent version of the payload that is available in the corresponding
1019    /// payload build process at the time of receiving this call.
1020    ///
1021    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
1022    ///
1023    /// Note:
1024    /// > Provider software MAY stop the corresponding build process after serving this call.
1025    async fn get_payload_v3(
1026        &self,
1027        payload_id: PayloadId,
1028    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1029        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1030        Ok(self.get_payload_v3_metered(payload_id).await?)
1031    }
1032
1033    /// Handler for `engine_getPayloadV4`
1034    ///
1035    /// Returns the most recent version of the payload that is available in the corresponding
1036    /// payload build process at the time of receiving this call.
1037    ///
1038    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1039    ///
1040    /// Note:
1041    /// > Provider software MAY stop the corresponding build process after serving this call.
1042    async fn get_payload_v4(
1043        &self,
1044        payload_id: PayloadId,
1045    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1046        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1047        Ok(self.get_payload_v4_metered(payload_id).await?)
1048    }
1049
1050    /// Handler for `engine_getPayloadV5`
1051    ///
1052    /// Returns the most recent version of the payload that is available in the corresponding
1053    /// payload build process at the time of receiving this call.
1054    ///
1055    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1056    ///
1057    /// Note:
1058    /// > Provider software MAY stop the corresponding build process after serving this call.
1059    async fn get_payload_v5(
1060        &self,
1061        payload_id: PayloadId,
1062    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1063        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1064        Ok(self.get_payload_v5_metered(payload_id).await?)
1065    }
1066
1067    /// Handler for `engine_getPayloadBodiesByHashV1`
1068    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1069    async fn get_payload_bodies_by_hash_v1(
1070        &self,
1071        block_hashes: Vec<BlockHash>,
1072    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1073        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1074        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1075    }
1076
1077    /// Handler for `engine_getPayloadBodiesByRangeV1`
1078    ///
1079    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1080    ///
1081    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1082    /// blocks.
1083    ///
1084    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1085    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1086    /// adversarial.
1087    ///
1088    /// Implementers should take care when acting on the input to this method, specifically
1089    /// ensuring that the range is limited properly, and that the range boundaries are computed
1090    /// correctly and without panics.
1091    ///
1092    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1093    async fn get_payload_bodies_by_range_v1(
1094        &self,
1095        start: U64,
1096        count: U64,
1097    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1098        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1099        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1100    }
1101
1102    /// Handler for `engine_getClientVersionV1`
1103    ///
1104    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1105    async fn get_client_version_v1(
1106        &self,
1107        client: ClientVersionV1,
1108    ) -> RpcResult<Vec<ClientVersionV1>> {
1109        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1110        Ok(Self::get_client_version_v1(self, client)?)
1111    }
1112
1113    /// Handler for `engine_exchangeCapabilitiesV1`
1114    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1115    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1116        Ok(self.capabilities().list())
1117    }
1118
1119    async fn get_blobs_v1(
1120        &self,
1121        versioned_hashes: Vec<B256>,
1122    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1123        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1124        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1125    }
1126
1127    async fn get_blobs_v2(
1128        &self,
1129        versioned_hashes: Vec<B256>,
1130    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1131        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1132        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1133    }
1134}
1135
1136impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1137    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1138where
1139    EngineT: EngineTypes,
1140    Self: EngineApiServer<EngineT>,
1141{
1142    fn into_rpc_module(self) -> RpcModule<()> {
1143        self.into_rpc().remove_context()
1144    }
1145}
1146
1147impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1148    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1149where
1150    PayloadT: PayloadTypes,
1151{
1152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1153        f.debug_struct("EngineApi").finish_non_exhaustive()
1154    }
1155}
1156
1157impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1158    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1159where
1160    PayloadT: PayloadTypes,
1161{
1162    fn clone(&self) -> Self {
1163        Self { inner: Arc::clone(&self.inner) }
1164    }
1165}
1166
1167/// The container type for the engine API internals.
1168struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1169    /// The provider to interact with the chain.
1170    provider: Provider,
1171    /// Consensus configuration
1172    chain_spec: Arc<ChainSpec>,
1173    /// The channel to send messages to the beacon consensus engine.
1174    beacon_consensus: ConsensusEngineHandle<PayloadT>,
1175    /// The type that can communicate with the payload service to retrieve payloads.
1176    payload_store: PayloadStore<PayloadT>,
1177    /// For spawning and executing async tasks
1178    task_spawner: Box<dyn TaskSpawner>,
1179    /// The latency and response type metrics for engine api calls
1180    metrics: EngineApiMetrics,
1181    /// Identification of the execution client used by the consensus client
1182    client: ClientVersionV1,
1183    /// The list of all supported Engine capabilities available over the engine endpoint.
1184    capabilities: EngineCapabilities,
1185    /// Transaction pool.
1186    tx_pool: Pool,
1187    /// Engine validator.
1188    validator: Validator,
1189    /// Start time of the latest payload request
1190    latest_new_payload_response: Mutex<Option<Instant>>,
1191    accept_execution_requests_hash: bool,
1192}
1193
1194impl<Provider, PayloadT, Pool, Validator, ChainSpec>
1195    EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
1196where
1197    PayloadT: PayloadTypes,
1198{
1199    /// Tracks the elapsed time between the new payload response and the received forkchoice update
1200    /// request.
1201    fn record_elapsed_time_on_fcu(&self) {
1202        if let Some(start_time) = self.latest_new_payload_response.lock().take() {
1203            let elapsed_time = start_time.elapsed();
1204            self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
1205        }
1206    }
1207
1208    /// Updates the timestamp for the latest new payload response.
1209    fn on_new_payload_response(&self) {
1210        self.latest_new_payload_response.lock().replace(Instant::now());
1211    }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216    use super::*;
1217    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1218    use assert_matches::assert_matches;
1219    use reth_chainspec::{ChainSpec, MAINNET};
1220    use reth_engine_primitives::BeaconEngineMessage;
1221    use reth_ethereum_engine_primitives::EthEngineTypes;
1222    use reth_ethereum_primitives::Block;
1223    use reth_node_ethereum::EthereumEngineValidator;
1224    use reth_payload_builder::test_utils::spawn_test_payload_service;
1225    use reth_provider::test_utils::MockEthProvider;
1226    use reth_tasks::TokioTaskExecutor;
1227    use reth_transaction_pool::noop::NoopTransactionPool;
1228    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1229
1230    fn setup_engine_api() -> (
1231        EngineApiTestHandle,
1232        EngineApi<
1233            Arc<MockEthProvider>,
1234            EthEngineTypes,
1235            NoopTransactionPool,
1236            EthereumEngineValidator,
1237            ChainSpec,
1238        >,
1239    ) {
1240        let client = ClientVersionV1 {
1241            code: ClientCode::RH,
1242            name: "Reth".to_string(),
1243            version: "v0.2.0-beta.5".to_string(),
1244            commit: "defa64b2".to_string(),
1245        };
1246
1247        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1248        let provider = Arc::new(MockEthProvider::default());
1249        let payload_store = spawn_test_payload_service();
1250        let (to_engine, engine_rx) = unbounded_channel();
1251        let task_executor = Box::<TokioTaskExecutor>::default();
1252        let api = EngineApi::new(
1253            provider.clone(),
1254            chain_spec.clone(),
1255            ConsensusEngineHandle::new(to_engine),
1256            payload_store.into(),
1257            NoopTransactionPool::default(),
1258            task_executor,
1259            client,
1260            EngineCapabilities::default(),
1261            EthereumEngineValidator::new(chain_spec.clone()),
1262            false,
1263        );
1264        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1265        (handle, api)
1266    }
1267
1268    #[tokio::test]
1269    async fn engine_client_version_v1() {
1270        let client = ClientVersionV1 {
1271            code: ClientCode::RH,
1272            name: "Reth".to_string(),
1273            version: "v0.2.0-beta.5".to_string(),
1274            commit: "defa64b2".to_string(),
1275        };
1276        let (_, api) = setup_engine_api();
1277        let res = api.get_client_version_v1(client.clone());
1278        assert_eq!(res.unwrap(), vec![client]);
1279    }
1280
1281    struct EngineApiTestHandle {
1282        #[allow(dead_code)]
1283        chain_spec: Arc<ChainSpec>,
1284        provider: Arc<MockEthProvider>,
1285        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1286    }
1287
1288    #[tokio::test]
1289    async fn forwards_responses_to_consensus_engine() {
1290        let (mut handle, api) = setup_engine_api();
1291
1292        tokio::spawn(async move {
1293            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1294            let execution_data = ExecutionData {
1295                payload: payload_v1.into(),
1296                sidecar: ExecutionPayloadSidecar::none(),
1297            };
1298
1299            api.new_payload_v1(execution_data).await.unwrap();
1300        });
1301        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1302    }
1303
1304    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1305    mod get_payload_bodies {
1306        use super::*;
1307        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1308        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1309
1310        #[tokio::test]
1311        async fn invalid_params() {
1312            let (_, api) = setup_engine_api();
1313
1314            let by_range_tests = [
1315                // (start, count)
1316                (0, 0),
1317                (0, 1),
1318                (1, 0),
1319            ];
1320
1321            // test [EngineApiMessage::GetPayloadBodiesByRange]
1322            for (start, count) in by_range_tests {
1323                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1324                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1325            }
1326        }
1327
1328        #[tokio::test]
1329        async fn request_too_large() {
1330            let (_, api) = setup_engine_api();
1331
1332            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1333            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1334            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1335        }
1336
1337        #[tokio::test]
1338        async fn returns_payload_bodies() {
1339            let mut rng = generators::rng();
1340            let (handle, api) = setup_engine_api();
1341
1342            let (start, count) = (1, 10);
1343            let blocks = random_block_range(
1344                &mut rng,
1345                start..=start + count - 1,
1346                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1347            );
1348            handle
1349                .provider
1350                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1351
1352            let expected = blocks
1353                .iter()
1354                .cloned()
1355                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1356                .collect::<Vec<_>>();
1357
1358            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1359            assert_eq!(res, expected);
1360        }
1361
1362        #[tokio::test]
1363        async fn returns_payload_bodies_with_gaps() {
1364            let mut rng = generators::rng();
1365            let (handle, api) = setup_engine_api();
1366
1367            let (start, count) = (1, 100);
1368            let blocks = random_block_range(
1369                &mut rng,
1370                start..=start + count - 1,
1371                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1372            );
1373
1374            // Insert only blocks in ranges 1-25 and 50-75
1375            let first_missing_range = 26..=50;
1376            let second_missing_range = 76..=100;
1377            handle.provider.extend_blocks(
1378                blocks
1379                    .iter()
1380                    .filter(|b| {
1381                        !first_missing_range.contains(&b.number) &&
1382                            !second_missing_range.contains(&b.number)
1383                    })
1384                    .map(|b| (b.hash(), b.clone().into_block())),
1385            );
1386
1387            let expected = blocks
1388                .iter()
1389                // filter anything after the second missing range to ensure we don't expect trailing
1390                // `None`s
1391                .filter(|b| !second_missing_range.contains(&b.number))
1392                .cloned()
1393                .map(|b| {
1394                    if first_missing_range.contains(&b.number) {
1395                        None
1396                    } else {
1397                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1398                    }
1399                })
1400                .collect::<Vec<_>>();
1401
1402            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1403            assert_eq!(res, expected);
1404
1405            let expected = blocks
1406                .iter()
1407                .cloned()
1408                // ensure we still return trailing `None`s here because by-hash will not be aware
1409                // of the missing block's number, and cannot compare it to the current best block
1410                .map(|b| {
1411                    if first_missing_range.contains(&b.number) ||
1412                        second_missing_range.contains(&b.number)
1413                    {
1414                        None
1415                    } else {
1416                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1417                    }
1418                })
1419                .collect::<Vec<_>>();
1420
1421            let hashes = blocks.iter().map(|b| b.hash()).collect();
1422            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1423            assert_eq!(res, expected);
1424        }
1425    }
1426}