Skip to main content

reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14    ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15    ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16    PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::NetworkInfo;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25    validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26    PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::TaskSpawner;
32use reth_transaction_pool::TransactionPool;
33use std::{
34    sync::Arc,
35    time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40/// The Engine API response sender.
41pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43/// The upper limit for payload bodies request.
44const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46/// The upper limit for blobs in `engine_getBlobsVx`.
47const MAX_BLOB_LIMIT: usize = 128;
48
49/// The Engine API implementation that grants the Consensus layer access to data and
50/// functions in the Execution layer that are crucial for the consensus process.
51///
52/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
53/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
54/// are still follow the engine API model.
55///
56/// ## Implementers
57///
58/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
59/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
60/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
61/// endpoints (e.g. opstack).
62/// See also [`EngineApiServer`] implementation for this type which is the
63/// L1 implementation.
64pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71    /// Returns the configured chainspec.
72    pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73        &self.inner.chain_spec
74    }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
81    PayloadT: PayloadTypes,
82    Pool: TransactionPool + 'static,
83    Validator: EngineApiValidator<PayloadT>,
84    ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86    /// Create new instance of [`EngineApi`].
87    #[expect(clippy::too_many_arguments)]
88    pub fn new(
89        provider: Provider,
90        chain_spec: Arc<ChainSpec>,
91        beacon_consensus: ConsensusEngineHandle<PayloadT>,
92        payload_store: PayloadStore<PayloadT>,
93        tx_pool: Pool,
94        task_spawner: Box<dyn TaskSpawner>,
95        client: ClientVersionV1,
96        capabilities: EngineCapabilities,
97        validator: Validator,
98        accept_execution_requests_hash: bool,
99        network: impl NetworkInfo + 'static,
100    ) -> Self {
101        let is_syncing = Arc::new(move || network.is_syncing());
102        let inner = Arc::new(EngineApiInner {
103            provider,
104            chain_spec,
105            beacon_consensus,
106            payload_store,
107            task_spawner,
108            metrics: EngineApiMetrics::default(),
109            client,
110            capabilities,
111            tx_pool,
112            validator,
113            accept_execution_requests_hash,
114            is_syncing,
115        });
116        Self { inner }
117    }
118
119    /// Fetches the client version.
120    pub fn get_client_version_v1(
121        &self,
122        _client: ClientVersionV1,
123    ) -> EngineApiResult<Vec<ClientVersionV1>> {
124        Ok(vec![self.inner.client.clone()])
125    }
126
127    /// Fetches the timestamp of the payload with the given id.
128    async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
129        Ok(self
130            .inner
131            .payload_store
132            .payload_timestamp(payload_id)
133            .await
134            .ok_or(EngineApiError::UnknownPayload)??)
135    }
136
137    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
138    /// Caution: This should not accept the `withdrawals` field
139    pub async fn new_payload_v1(
140        &self,
141        payload: PayloadT::ExecutionData,
142    ) -> EngineApiResult<PayloadStatus> {
143        let payload_or_attrs = PayloadOrAttributes::<
144            '_,
145            PayloadT::ExecutionData,
146            PayloadT::PayloadAttributes,
147        >::from_execution_payload(&payload);
148
149        self.inner
150            .validator
151            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
154    }
155
156    /// Metered version of `new_payload_v1`.
157    pub async fn new_payload_v1_metered(
158        &self,
159        payload: PayloadT::ExecutionData,
160    ) -> EngineApiResult<PayloadStatus> {
161        let start = Instant::now();
162        let res = Self::new_payload_v1(self, payload).await;
163        let elapsed = start.elapsed();
164        self.inner.metrics.latency.new_payload_v1.record(elapsed);
165        res
166    }
167
168    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
169    pub async fn new_payload_v2(
170        &self,
171        payload: PayloadT::ExecutionData,
172    ) -> EngineApiResult<PayloadStatus> {
173        let payload_or_attrs = PayloadOrAttributes::<
174            '_,
175            PayloadT::ExecutionData,
176            PayloadT::PayloadAttributes,
177        >::from_execution_payload(&payload);
178        self.inner
179            .validator
180            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
181        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
182    }
183
184    /// Metered version of `new_payload_v2`.
185    pub async fn new_payload_v2_metered(
186        &self,
187        payload: PayloadT::ExecutionData,
188    ) -> EngineApiResult<PayloadStatus> {
189        let start = Instant::now();
190        let res = Self::new_payload_v2(self, payload).await;
191        let elapsed = start.elapsed();
192        self.inner.metrics.latency.new_payload_v2.record(elapsed);
193        res
194    }
195
196    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
197    pub async fn new_payload_v3(
198        &self,
199        payload: PayloadT::ExecutionData,
200    ) -> EngineApiResult<PayloadStatus> {
201        let payload_or_attrs = PayloadOrAttributes::<
202            '_,
203            PayloadT::ExecutionData,
204            PayloadT::PayloadAttributes,
205        >::from_execution_payload(&payload);
206        self.inner
207            .validator
208            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
209
210        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
211    }
212
213    /// Metrics version of `new_payload_v3`
214    pub async fn new_payload_v3_metered(
215        &self,
216        payload: PayloadT::ExecutionData,
217    ) -> RpcResult<PayloadStatus> {
218        let start = Instant::now();
219
220        let res = Self::new_payload_v3(self, payload).await;
221        let elapsed = start.elapsed();
222        self.inner.metrics.latency.new_payload_v3.record(elapsed);
223        Ok(res?)
224    }
225
226    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
227    pub async fn new_payload_v4(
228        &self,
229        payload: PayloadT::ExecutionData,
230    ) -> EngineApiResult<PayloadStatus> {
231        let payload_or_attrs = PayloadOrAttributes::<
232            '_,
233            PayloadT::ExecutionData,
234            PayloadT::PayloadAttributes,
235        >::from_execution_payload(&payload);
236        self.inner
237            .validator
238            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
239
240        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
241    }
242
243    /// Metrics version of `new_payload_v4`
244    pub async fn new_payload_v4_metered(
245        &self,
246        payload: PayloadT::ExecutionData,
247    ) -> RpcResult<PayloadStatus> {
248        let start = Instant::now();
249        let res = Self::new_payload_v4(self, payload).await;
250
251        let elapsed = start.elapsed();
252        self.inner.metrics.latency.new_payload_v4.record(elapsed);
253        Ok(res?)
254    }
255
256    /// Returns whether the engine accepts execution requests hash.
257    pub fn accept_execution_requests_hash(&self) -> bool {
258        self.inner.accept_execution_requests_hash
259    }
260}
261
262impl<Provider, EngineT, Pool, Validator, ChainSpec>
263    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
264where
265    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
266    EngineT: EngineTypes,
267    Pool: TransactionPool + 'static,
268    Validator: EngineApiValidator<EngineT>,
269    ChainSpec: EthereumHardforks + Send + Sync + 'static,
270{
271    /// Sends a message to the beacon consensus engine to update the fork choice _without_
272    /// withdrawals.
273    ///
274    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
275    ///
276    /// Caution: This should not accept the `withdrawals` field
277    pub async fn fork_choice_updated_v1(
278        &self,
279        state: ForkchoiceState,
280        payload_attrs: Option<EngineT::PayloadAttributes>,
281    ) -> EngineApiResult<ForkchoiceUpdated> {
282        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
283            .await
284    }
285
286    /// Metrics version of `fork_choice_updated_v1`
287    pub async fn fork_choice_updated_v1_metered(
288        &self,
289        state: ForkchoiceState,
290        payload_attrs: Option<EngineT::PayloadAttributes>,
291    ) -> EngineApiResult<ForkchoiceUpdated> {
292        let start = Instant::now();
293        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
294        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
295        res
296    }
297
298    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
299    /// but only _after_ shanghai.
300    ///
301    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
302    pub async fn fork_choice_updated_v2(
303        &self,
304        state: ForkchoiceState,
305        payload_attrs: Option<EngineT::PayloadAttributes>,
306    ) -> EngineApiResult<ForkchoiceUpdated> {
307        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
308            .await
309    }
310
311    /// Metrics version of `fork_choice_updated_v2`
312    pub async fn fork_choice_updated_v2_metered(
313        &self,
314        state: ForkchoiceState,
315        payload_attrs: Option<EngineT::PayloadAttributes>,
316    ) -> EngineApiResult<ForkchoiceUpdated> {
317        let start = Instant::now();
318        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
319        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
320        res
321    }
322
323    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
324    /// but only _after_ cancun.
325    ///
326    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
327    pub async fn fork_choice_updated_v3(
328        &self,
329        state: ForkchoiceState,
330        payload_attrs: Option<EngineT::PayloadAttributes>,
331    ) -> EngineApiResult<ForkchoiceUpdated> {
332        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
333            .await
334    }
335
336    /// Metrics version of `fork_choice_updated_v3`
337    pub async fn fork_choice_updated_v3_metered(
338        &self,
339        state: ForkchoiceState,
340        payload_attrs: Option<EngineT::PayloadAttributes>,
341    ) -> EngineApiResult<ForkchoiceUpdated> {
342        let start = Instant::now();
343        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
344        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
345        res
346    }
347
348    /// Helper function for retrieving the build payload by id.
349    async fn get_built_payload(
350        &self,
351        payload_id: PayloadId,
352    ) -> EngineApiResult<EngineT::BuiltPayload> {
353        self.inner
354            .payload_store
355            .resolve(payload_id)
356            .await
357            .ok_or(EngineApiError::UnknownPayload)?
358            .map_err(|_| EngineApiError::UnknownPayload)
359    }
360
361    /// Helper function for validating the payload timestamp and retrieving & converting the payload
362    /// into desired envelope.
363    async fn get_payload_inner<R>(
364        &self,
365        payload_id: PayloadId,
366        version: EngineApiMessageVersion,
367    ) -> EngineApiResult<R>
368    where
369        EngineT::BuiltPayload: TryInto<R>,
370    {
371        // Validate timestamp according to engine rules
372        // Enforces Osaka restrictions on `getPayloadV4`.
373        let timestamp = self.get_payload_timestamp(payload_id).await?;
374        validate_payload_timestamp(
375            &self.inner.chain_spec,
376            version,
377            timestamp,
378            MessageValidationKind::GetPayload,
379        )?;
380
381        // Now resolve the payload
382        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
383            warn!(?version, "could not transform built payload");
384            EngineApiError::UnknownPayload
385        })
386    }
387
388    /// Returns the most recent version of the payload that is available in the corresponding
389    /// payload build process at the time of receiving this call.
390    ///
391    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
392    ///
393    /// Caution: This should not return the `withdrawals` field
394    ///
395    /// Note:
396    /// > Provider software MAY stop the corresponding build process after serving this call.
397    pub async fn get_payload_v1(
398        &self,
399        payload_id: PayloadId,
400    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
401        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
402            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
403            EngineApiError::UnknownPayload
404        })
405    }
406
407    /// Metrics version of `get_payload_v1`
408    pub async fn get_payload_v1_metered(
409        &self,
410        payload_id: PayloadId,
411    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
412        let start = Instant::now();
413        let res = Self::get_payload_v1(self, payload_id).await;
414        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
415        res
416    }
417
418    /// Returns the most recent version of the payload that is available in the corresponding
419    /// payload build process at the time of receiving this call.
420    ///
421    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
422    ///
423    /// Note:
424    /// > Provider software MAY stop the corresponding build process after serving this call.
425    pub async fn get_payload_v2(
426        &self,
427        payload_id: PayloadId,
428    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
429        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
430    }
431
432    /// Metrics version of `get_payload_v2`
433    pub async fn get_payload_v2_metered(
434        &self,
435        payload_id: PayloadId,
436    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
437        let start = Instant::now();
438        let res = Self::get_payload_v2(self, payload_id).await;
439        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
440        res
441    }
442
443    /// Returns the most recent version of the payload that is available in the corresponding
444    /// payload build process at the time of receiving this call.
445    ///
446    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
447    ///
448    /// Note:
449    /// > Provider software MAY stop the corresponding build process after serving this call.
450    pub async fn get_payload_v3(
451        &self,
452        payload_id: PayloadId,
453    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
454        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
455    }
456
457    /// Metrics version of `get_payload_v3`
458    pub async fn get_payload_v3_metered(
459        &self,
460        payload_id: PayloadId,
461    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
462        let start = Instant::now();
463        let res = Self::get_payload_v3(self, payload_id).await;
464        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
465        res
466    }
467
468    /// Returns the most recent version of the payload that is available in the corresponding
469    /// payload build process at the time of receiving this call.
470    ///
471    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_getpayloadv4>
472    ///
473    /// Note:
474    /// > Provider software MAY stop the corresponding build process after serving this call.
475    pub async fn get_payload_v4(
476        &self,
477        payload_id: PayloadId,
478    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
479        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
480    }
481
482    /// Metrics version of `get_payload_v4`
483    pub async fn get_payload_v4_metered(
484        &self,
485        payload_id: PayloadId,
486    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
487        let start = Instant::now();
488        let res = Self::get_payload_v4(self, payload_id).await;
489        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
490        res
491    }
492
493    /// Handler for `engine_getPayloadV5`
494    ///
495    /// Returns the most recent version of the payload that is available in the corresponding
496    /// payload build process at the time of receiving this call.
497    ///
498    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
499    ///
500    /// Note:
501    /// > Provider software MAY stop the corresponding build process after serving this call.
502    pub async fn get_payload_v5(
503        &self,
504        payload_id: PayloadId,
505    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
506        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
507    }
508
509    /// Metrics version of `get_payload_v5`
510    pub async fn get_payload_v5_metered(
511        &self,
512        payload_id: PayloadId,
513    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
514        let start = Instant::now();
515        let res = Self::get_payload_v5(self, payload_id).await;
516        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
517        res
518    }
519
520    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
521    /// blocks and returns the mapped payload bodies.
522    pub async fn get_payload_bodies_by_range_with<F, R>(
523        &self,
524        start: BlockNumber,
525        count: u64,
526        f: F,
527    ) -> EngineApiResult<Vec<Option<R>>>
528    where
529        F: Fn(Provider::Block) -> R + Send + 'static,
530        R: Send + 'static,
531    {
532        let (tx, rx) = oneshot::channel();
533        let inner = self.inner.clone();
534
535        self.inner.task_spawner.spawn_blocking_task(Box::pin(async move {
536            if count > MAX_PAYLOAD_BODIES_LIMIT {
537                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
538                return;
539            }
540
541            if start == 0 || count == 0 {
542                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
543                return;
544            }
545
546            let mut result = Vec::with_capacity(count as usize);
547
548            // -1 so range is inclusive
549            let mut end = start.saturating_add(count - 1);
550
551            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
552            // truncate the end if it's greater than the last block
553            if let Ok(best_block) = inner.provider.best_block_number()
554                && end > best_block {
555                    end = best_block;
556                }
557
558            // Check if the requested range starts before the earliest available block due to pruning/expiry
559            let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
560            for num in start..=end {
561                if num < earliest_block {
562                    result.push(None);
563                    continue;
564                }
565                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
566                match block_result {
567                    Ok(block) => {
568                        result.push(block.map(&f));
569                    }
570                    Err(err) => {
571                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
572                        return;
573                    }
574                };
575            }
576            tx.send(Ok(result)).ok();
577        }));
578
579        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
580    }
581
582    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
583    /// blocks.
584    ///
585    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
586    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
587    /// adversarial.
588    ///
589    /// Implementers should take care when acting on the input to this method, specifically
590    /// ensuring that the range is limited properly, and that the range boundaries are computed
591    /// correctly and without panics.
592    pub async fn get_payload_bodies_by_range_v1(
593        &self,
594        start: BlockNumber,
595        count: u64,
596    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
597        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
598            transactions: block.body().encoded_2718_transactions(),
599            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
600        })
601        .await
602    }
603
604    /// Metrics version of `get_payload_bodies_by_range_v1`
605    pub async fn get_payload_bodies_by_range_v1_metered(
606        &self,
607        start: BlockNumber,
608        count: u64,
609    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
610        let start_time = Instant::now();
611        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
612        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
613        res
614    }
615
616    /// Returns the execution payload bodies by the range (V2).
617    ///
618    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
619    pub async fn get_payload_bodies_by_range_v2(
620        &self,
621        start: BlockNumber,
622        count: u64,
623    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
624        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 {
625            transactions: block.body().encoded_2718_transactions(),
626            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
627            block_access_list: None,
628        })
629        .await
630    }
631
632    /// Metrics version of `get_payload_bodies_by_range_v2`
633    pub async fn get_payload_bodies_by_range_v2_metered(
634        &self,
635        start: BlockNumber,
636        count: u64,
637    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
638        let start_time = Instant::now();
639        let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
640        self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
641        res
642    }
643
644    /// Called to retrieve execution payload bodies by hashes.
645    pub async fn get_payload_bodies_by_hash_with<F, R>(
646        &self,
647        hashes: Vec<BlockHash>,
648        f: F,
649    ) -> EngineApiResult<Vec<Option<R>>>
650    where
651        F: Fn(Provider::Block) -> R + Send + 'static,
652        R: Send + 'static,
653    {
654        let len = hashes.len() as u64;
655        if len > MAX_PAYLOAD_BODIES_LIMIT {
656            return Err(EngineApiError::PayloadRequestTooLarge { len });
657        }
658
659        let (tx, rx) = oneshot::channel();
660        let inner = self.inner.clone();
661
662        self.inner.task_spawner.spawn_blocking_task(Box::pin(async move {
663            let mut result = Vec::with_capacity(hashes.len());
664            for hash in hashes {
665                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
666                match block_result {
667                    Ok(block) => {
668                        result.push(block.map(&f));
669                    }
670                    Err(err) => {
671                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
672                        return;
673                    }
674                }
675            }
676            tx.send(Ok(result)).ok();
677        }));
678
679        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
680    }
681
682    /// Called to retrieve execution payload bodies by hashes.
683    pub async fn get_payload_bodies_by_hash_v1(
684        &self,
685        hashes: Vec<BlockHash>,
686    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
687        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
688            transactions: block.body().encoded_2718_transactions(),
689            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
690        })
691        .await
692    }
693
694    /// Metrics version of `get_payload_bodies_by_hash_v1`
695    pub async fn get_payload_bodies_by_hash_v1_metered(
696        &self,
697        hashes: Vec<BlockHash>,
698    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
699        let start = Instant::now();
700        let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
701        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
702        res
703    }
704
705    /// Called to retrieve execution payload bodies by hashes (V2).
706    ///
707    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
708    pub async fn get_payload_bodies_by_hash_v2(
709        &self,
710        hashes: Vec<BlockHash>,
711    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
712        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV2 {
713            transactions: block.body().encoded_2718_transactions(),
714            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
715            block_access_list: None,
716        })
717        .await
718    }
719
720    /// Metrics version of `get_payload_bodies_by_hash_v2`
721    pub async fn get_payload_bodies_by_hash_v2_metered(
722        &self,
723        hashes: Vec<BlockHash>,
724    ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
725        let start = Instant::now();
726        let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
727        self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
728        res
729    }
730
731    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
732    /// update.
733    ///
734    /// The payload attributes will be validated according to the engine API rules for the given
735    /// message version:
736    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
737    ///   validated according to the Paris rules.
738    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
739    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
740    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
741    ///
742    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
743    ///   validated according to the Cancun rules.
744    async fn validate_and_execute_forkchoice(
745        &self,
746        version: EngineApiMessageVersion,
747        state: ForkchoiceState,
748        payload_attrs: Option<EngineT::PayloadAttributes>,
749    ) -> EngineApiResult<ForkchoiceUpdated> {
750        if let Some(ref attrs) = payload_attrs {
751            let attr_validation_res =
752                self.inner.validator.ensure_well_formed_attributes(version, attrs);
753
754            // From the engine API spec:
755            //
756            // Client software MUST ensure that payloadAttributes.timestamp is greater than
757            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
758            // isn't held client software MUST respond with -38003: Invalid payload attributes and
759            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
760            // update MUST NOT be rolled back.
761            //
762            // NOTE: This will also apply to the validation result for the cancun or
763            // shanghai-specific fields provided in the payload attributes.
764            //
765            // To do this, we set the payload attrs to `None` if attribute validation failed, but
766            // we still apply the forkchoice update.
767            if let Err(err) = attr_validation_res {
768                let fcu_res =
769                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
770                // TODO: decide if we want this branch - the FCU INVALID response might be more
771                // useful than the payload attributes INVALID response
772                if fcu_res.is_invalid() {
773                    return Ok(fcu_res)
774                }
775                return Err(err.into())
776            }
777        }
778
779        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
780    }
781
782    /// Returns reference to supported capabilities.
783    pub fn capabilities(&self) -> &EngineCapabilities {
784        &self.inner.capabilities
785    }
786
787    fn get_blobs_v1(
788        &self,
789        versioned_hashes: Vec<B256>,
790    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
791        // Only allow this method before Osaka fork
792        let current_timestamp =
793            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
794        if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
795            return Err(EngineApiError::EngineObjectValidationError(
796                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
797            ));
798        }
799
800        if versioned_hashes.len() > MAX_BLOB_LIMIT {
801            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
802        }
803
804        self.inner
805            .tx_pool
806            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
807            .map_err(|err| EngineApiError::Internal(Box::new(err)))
808    }
809
810    /// Metered version of `get_blobs_v1`.
811    pub fn get_blobs_v1_metered(
812        &self,
813        versioned_hashes: Vec<B256>,
814    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
815        let hashes_len = versioned_hashes.len();
816        let start = Instant::now();
817        let res = Self::get_blobs_v1(self, versioned_hashes);
818        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
819
820        if let Ok(blobs) = &res {
821            let blobs_found = blobs.iter().flatten().count();
822            let blobs_missed = hashes_len - blobs_found;
823
824            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
825            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
826        }
827
828        res
829    }
830
831    fn get_blobs_v2(
832        &self,
833        versioned_hashes: Vec<B256>,
834    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
835        // Check if Osaka fork is active
836        let current_timestamp =
837            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
838        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
839            return Err(EngineApiError::EngineObjectValidationError(
840                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
841            ));
842        }
843
844        if versioned_hashes.len() > MAX_BLOB_LIMIT {
845            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
846        }
847
848        self.inner
849            .tx_pool
850            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
851            .map_err(|err| EngineApiError::Internal(Box::new(err)))
852    }
853
854    fn get_blobs_v3(
855        &self,
856        versioned_hashes: Vec<B256>,
857    ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
858        // Check if Osaka fork is active
859        let current_timestamp =
860            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
861        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
862            return Err(EngineApiError::EngineObjectValidationError(
863                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
864            ));
865        }
866
867        if versioned_hashes.len() > MAX_BLOB_LIMIT {
868            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
869        }
870
871        // Spec requires returning `null` if syncing.
872        if (*self.inner.is_syncing)() {
873            return Ok(None)
874        }
875
876        self.inner
877            .tx_pool
878            .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
879            .map(Some)
880            .map_err(|err| EngineApiError::Internal(Box::new(err)))
881    }
882
883    /// Metered version of `get_blobs_v2`.
884    pub fn get_blobs_v2_metered(
885        &self,
886        versioned_hashes: Vec<B256>,
887    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
888        let hashes_len = versioned_hashes.len();
889        let start = Instant::now();
890        let res = Self::get_blobs_v2(self, versioned_hashes);
891        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
892
893        if let Ok(blobs) = &res {
894            let blobs_found = blobs.iter().flatten().count();
895
896            self.inner
897                .metrics
898                .blob_metrics
899                .get_blobs_requests_blobs_total
900                .increment(hashes_len as u64);
901            self.inner
902                .metrics
903                .blob_metrics
904                .get_blobs_requests_blobs_in_blobpool_total
905                .increment(blobs_found as u64);
906
907            if blobs_found == hashes_len {
908                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
909            } else {
910                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
911            }
912        } else {
913            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
914        }
915
916        res
917    }
918
919    /// Metered version of `get_blobs_v3`.
920    pub fn get_blobs_v3_metered(
921        &self,
922        versioned_hashes: Vec<B256>,
923    ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
924        let hashes_len = versioned_hashes.len();
925        let start = Instant::now();
926        let res = Self::get_blobs_v3(self, versioned_hashes);
927        self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
928
929        if let Ok(Some(blobs)) = &res {
930            let blobs_found = blobs.iter().flatten().count();
931            let blobs_missed = hashes_len - blobs_found;
932
933            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
934            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
935        }
936
937        res
938    }
939}
940
941// This is the concrete ethereum engine API implementation.
942#[async_trait]
943impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
944    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
945where
946    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
947    EngineT: EngineTypes<ExecutionData = ExecutionData>,
948    Pool: TransactionPool + 'static,
949    Validator: EngineApiValidator<EngineT>,
950    ChainSpec: EthereumHardforks + Send + Sync + 'static,
951{
952    /// Handler for `engine_newPayloadV1`
953    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
954    /// Caution: This should not accept the `withdrawals` field
955    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
956        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
957        let payload =
958            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
959        Ok(self.new_payload_v1_metered(payload).await?)
960    }
961
962    /// Handler for `engine_newPayloadV2`
963    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
964    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
965        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
966        let payload = ExecutionData {
967            payload: payload.into_payload(),
968            sidecar: ExecutionPayloadSidecar::none(),
969        };
970
971        Ok(self.new_payload_v2_metered(payload).await?)
972    }
973
974    /// Handler for `engine_newPayloadV3`
975    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
976    async fn new_payload_v3(
977        &self,
978        payload: ExecutionPayloadV3,
979        versioned_hashes: Vec<B256>,
980        parent_beacon_block_root: B256,
981    ) -> RpcResult<PayloadStatus> {
982        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
983        let payload = ExecutionData {
984            payload: payload.into(),
985            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
986                versioned_hashes,
987                parent_beacon_block_root,
988            }),
989        };
990
991        Ok(self.new_payload_v3_metered(payload).await?)
992    }
993
994    /// Handler for `engine_newPayloadV4`
995    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
996    async fn new_payload_v4(
997        &self,
998        payload: ExecutionPayloadV3,
999        versioned_hashes: Vec<B256>,
1000        parent_beacon_block_root: B256,
1001        requests: RequestsOrHash,
1002    ) -> RpcResult<PayloadStatus> {
1003        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1004
1005        // Accept requests as a hash only if it is explicitly allowed
1006        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1007            return Err(EngineApiError::UnexpectedRequestsHash.into());
1008        }
1009
1010        let payload = ExecutionData {
1011            payload: payload.into(),
1012            sidecar: ExecutionPayloadSidecar::v4(
1013                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1014                PraguePayloadFields { requests },
1015            ),
1016        };
1017
1018        Ok(self.new_payload_v4_metered(payload).await?)
1019    }
1020
1021    /// Handler for `engine_newPayloadV5`
1022    ///
1023    /// Post Amsterdam payload handler. Currently returns unsupported fork error.
1024    ///
1025    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_newpayloadv5>
1026    async fn new_payload_v5(
1027        &self,
1028        _payload: ExecutionPayloadV4,
1029        _versioned_hashes: Vec<B256>,
1030        _parent_beacon_block_root: B256,
1031        _execution_requests: RequestsOrHash,
1032    ) -> RpcResult<PayloadStatus> {
1033        trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1034        Err(EngineApiError::EngineObjectValidationError(
1035            reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1036        ))?
1037    }
1038
1039    /// Handler for `engine_forkchoiceUpdatedV1`
1040    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
1041    ///
1042    /// Caution: This should not accept the `withdrawals` field
1043    async fn fork_choice_updated_v1(
1044        &self,
1045        fork_choice_state: ForkchoiceState,
1046        payload_attributes: Option<EngineT::PayloadAttributes>,
1047    ) -> RpcResult<ForkchoiceUpdated> {
1048        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1049        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1050    }
1051
1052    /// Handler for `engine_forkchoiceUpdatedV2`
1053    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
1054    async fn fork_choice_updated_v2(
1055        &self,
1056        fork_choice_state: ForkchoiceState,
1057        payload_attributes: Option<EngineT::PayloadAttributes>,
1058    ) -> RpcResult<ForkchoiceUpdated> {
1059        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1060        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1061    }
1062
1063    /// Handler for `engine_forkchoiceUpdatedV3`
1064    ///
1065    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
1066    async fn fork_choice_updated_v3(
1067        &self,
1068        fork_choice_state: ForkchoiceState,
1069        payload_attributes: Option<EngineT::PayloadAttributes>,
1070    ) -> RpcResult<ForkchoiceUpdated> {
1071        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1072        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1073    }
1074
1075    /// Handler for `engine_getPayloadV1`
1076    ///
1077    /// Returns the most recent version of the payload that is available in the corresponding
1078    /// payload build process at the time of receiving this call.
1079    ///
1080    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
1081    ///
1082    /// Caution: This should not return the `withdrawals` field
1083    ///
1084    /// Note:
1085    /// > Provider software MAY stop the corresponding build process after serving this call.
1086    async fn get_payload_v1(
1087        &self,
1088        payload_id: PayloadId,
1089    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1090        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1091        Ok(self.get_payload_v1_metered(payload_id).await?)
1092    }
1093
1094    /// Handler for `engine_getPayloadV2`
1095    ///
1096    /// Returns the most recent version of the payload that is available in the corresponding
1097    /// payload build process at the time of receiving this call.
1098    ///
1099    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
1100    ///
1101    /// Note:
1102    /// > Provider software MAY stop the corresponding build process after serving this call.
1103    async fn get_payload_v2(
1104        &self,
1105        payload_id: PayloadId,
1106    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1107        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1108        Ok(self.get_payload_v2_metered(payload_id).await?)
1109    }
1110
1111    /// Handler for `engine_getPayloadV3`
1112    ///
1113    /// Returns the most recent version of the payload that is available in the corresponding
1114    /// payload build process at the time of receiving this call.
1115    ///
1116    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
1117    ///
1118    /// Note:
1119    /// > Provider software MAY stop the corresponding build process after serving this call.
1120    async fn get_payload_v3(
1121        &self,
1122        payload_id: PayloadId,
1123    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1124        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1125        Ok(self.get_payload_v3_metered(payload_id).await?)
1126    }
1127
1128    /// Handler for `engine_getPayloadV4`
1129    ///
1130    /// Returns the most recent version of the payload that is available in the corresponding
1131    /// payload build process at the time of receiving this call.
1132    ///
1133    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1134    ///
1135    /// Note:
1136    /// > Provider software MAY stop the corresponding build process after serving this call.
1137    async fn get_payload_v4(
1138        &self,
1139        payload_id: PayloadId,
1140    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1141        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1142        Ok(self.get_payload_v4_metered(payload_id).await?)
1143    }
1144
1145    /// Handler for `engine_getPayloadV5`
1146    ///
1147    /// Returns the most recent version of the payload that is available in the corresponding
1148    /// payload build process at the time of receiving this call.
1149    ///
1150    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1151    ///
1152    /// Note:
1153    /// > Provider software MAY stop the corresponding build process after serving this call.
1154    async fn get_payload_v5(
1155        &self,
1156        payload_id: PayloadId,
1157    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1158        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1159        Ok(self.get_payload_v5_metered(payload_id).await?)
1160    }
1161
1162    /// Handler for `engine_getPayloadV6`
1163    ///
1164    /// Post Amsterdam payload handler. Currently returns unsupported fork error.
1165    ///
1166    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/amsterdam.md#engine_getpayloadv6>
1167    async fn get_payload_v6(
1168        &self,
1169        _payload_id: PayloadId,
1170    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1171        trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1172        Err(EngineApiError::EngineObjectValidationError(
1173            reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1174        ))?
1175    }
1176
1177    /// Handler for `engine_getPayloadBodiesByHashV1`
1178    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1179    async fn get_payload_bodies_by_hash_v1(
1180        &self,
1181        block_hashes: Vec<BlockHash>,
1182    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1183        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1184        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1185    }
1186
1187    /// Handler for `engine_getPayloadBodiesByHashV2`
1188    ///
1189    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
1190    ///
1191    /// See also <https://eips.ethereum.org/EIPS/eip-7928>
1192    async fn get_payload_bodies_by_hash_v2(
1193        &self,
1194        block_hashes: Vec<BlockHash>,
1195    ) -> RpcResult<ExecutionPayloadBodiesV2> {
1196        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1197        Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1198    }
1199
1200    /// Handler for `engine_getPayloadBodiesByRangeV1`
1201    ///
1202    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1203    ///
1204    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1205    /// blocks.
1206    ///
1207    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1208    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1209    /// adversarial.
1210    ///
1211    /// Implementers should take care when acting on the input to this method, specifically
1212    /// ensuring that the range is limited properly, and that the range boundaries are computed
1213    /// correctly and without panics.
1214    ///
1215    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1216    async fn get_payload_bodies_by_range_v1(
1217        &self,
1218        start: U64,
1219        count: U64,
1220    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1221        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1222        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1223    }
1224
1225    /// Handler for `engine_getPayloadBodiesByRangeV2`
1226    ///
1227    /// V2 includes the `block_access_list` field for EIP-7928 BAL support.
1228    ///
1229    /// See also <https://eips.ethereum.org/EIPS/eip-7928>
1230    async fn get_payload_bodies_by_range_v2(
1231        &self,
1232        start: U64,
1233        count: U64,
1234    ) -> RpcResult<ExecutionPayloadBodiesV2> {
1235        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1236        Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1237    }
1238
1239    /// Handler for `engine_getClientVersionV1`
1240    ///
1241    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1242    async fn get_client_version_v1(
1243        &self,
1244        client: ClientVersionV1,
1245    ) -> RpcResult<Vec<ClientVersionV1>> {
1246        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1247        Ok(Self::get_client_version_v1(self, client)?)
1248    }
1249
1250    /// Handler for `engine_exchangeCapabilitiesV1`
1251    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1252    async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1253        trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1254
1255        let el_caps = self.capabilities();
1256        el_caps.log_capability_mismatches(&capabilities);
1257
1258        Ok(el_caps.list())
1259    }
1260
1261    async fn get_blobs_v1(
1262        &self,
1263        versioned_hashes: Vec<B256>,
1264    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1265        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1266        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1267    }
1268
1269    async fn get_blobs_v2(
1270        &self,
1271        versioned_hashes: Vec<B256>,
1272    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1273        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1274        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1275    }
1276
1277    async fn get_blobs_v3(
1278        &self,
1279        versioned_hashes: Vec<B256>,
1280    ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1281        trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1282        Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1283    }
1284}
1285
1286impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1287    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1288where
1289    EngineT: EngineTypes,
1290    Self: EngineApiServer<EngineT>,
1291{
1292    fn into_rpc_module(self) -> RpcModule<()> {
1293        self.into_rpc().remove_context()
1294    }
1295}
1296
1297impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1298    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1299where
1300    PayloadT: PayloadTypes,
1301{
1302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1303        f.debug_struct("EngineApi").finish_non_exhaustive()
1304    }
1305}
1306
1307impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1308    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1309where
1310    PayloadT: PayloadTypes,
1311{
1312    fn clone(&self) -> Self {
1313        Self { inner: Arc::clone(&self.inner) }
1314    }
1315}
1316
1317/// The container type for the engine API internals.
1318struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1319    /// The provider to interact with the chain.
1320    provider: Provider,
1321    /// Consensus configuration
1322    chain_spec: Arc<ChainSpec>,
1323    /// The channel to send messages to the beacon consensus engine.
1324    beacon_consensus: ConsensusEngineHandle<PayloadT>,
1325    /// The type that can communicate with the payload service to retrieve payloads.
1326    payload_store: PayloadStore<PayloadT>,
1327    /// For spawning and executing async tasks
1328    task_spawner: Box<dyn TaskSpawner>,
1329    /// The latency and response type metrics for engine api calls
1330    metrics: EngineApiMetrics,
1331    /// Identification of the execution client used by the consensus client
1332    client: ClientVersionV1,
1333    /// The list of all supported Engine capabilities available over the engine endpoint.
1334    capabilities: EngineCapabilities,
1335    /// Transaction pool.
1336    tx_pool: Pool,
1337    /// Engine validator.
1338    validator: Validator,
1339    accept_execution_requests_hash: bool,
1340    /// Returns `true` if the node is currently syncing.
1341    is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346    use super::*;
1347    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1348    use assert_matches::assert_matches;
1349    use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1350    use reth_engine_primitives::BeaconEngineMessage;
1351    use reth_ethereum_engine_primitives::EthEngineTypes;
1352    use reth_ethereum_primitives::Block;
1353    use reth_network_api::{
1354        noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1355    };
1356    use reth_node_ethereum::EthereumEngineValidator;
1357    use reth_payload_builder::test_utils::spawn_test_payload_service;
1358    use reth_provider::test_utils::MockEthProvider;
1359    use reth_tasks::TokioTaskExecutor;
1360    use reth_transaction_pool::noop::NoopTransactionPool;
1361    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1362
1363    fn setup_engine_api() -> (
1364        EngineApiTestHandle,
1365        EngineApi<
1366            Arc<MockEthProvider>,
1367            EthEngineTypes,
1368            NoopTransactionPool,
1369            EthereumEngineValidator,
1370            ChainSpec,
1371        >,
1372    ) {
1373        let client = ClientVersionV1 {
1374            code: ClientCode::RH,
1375            name: "Reth".to_string(),
1376            version: "v0.2.0-beta.5".to_string(),
1377            commit: "defa64b2".to_string(),
1378        };
1379
1380        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1381        let provider = Arc::new(MockEthProvider::default());
1382        let payload_store = spawn_test_payload_service();
1383        let (to_engine, engine_rx) = unbounded_channel();
1384        let task_executor = Box::<TokioTaskExecutor>::default();
1385        let api = EngineApi::new(
1386            provider.clone(),
1387            chain_spec.clone(),
1388            ConsensusEngineHandle::new(to_engine),
1389            payload_store.into(),
1390            NoopTransactionPool::default(),
1391            task_executor,
1392            client,
1393            EngineCapabilities::default(),
1394            EthereumEngineValidator::new(chain_spec.clone()),
1395            false,
1396            NoopNetwork::default(),
1397        );
1398        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1399        (handle, api)
1400    }
1401
1402    #[tokio::test]
1403    async fn engine_client_version_v1() {
1404        let client = ClientVersionV1 {
1405            code: ClientCode::RH,
1406            name: "Reth".to_string(),
1407            version: "v0.2.0-beta.5".to_string(),
1408            commit: "defa64b2".to_string(),
1409        };
1410        let (_, api) = setup_engine_api();
1411        let res = api.get_client_version_v1(client.clone());
1412        assert_eq!(res.unwrap(), vec![client]);
1413    }
1414
1415    struct EngineApiTestHandle {
1416        #[allow(dead_code)]
1417        chain_spec: Arc<ChainSpec>,
1418        provider: Arc<MockEthProvider>,
1419        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1420    }
1421
1422    #[tokio::test]
1423    async fn forwards_responses_to_consensus_engine() {
1424        let (mut handle, api) = setup_engine_api();
1425
1426        tokio::spawn(async move {
1427            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1428            let execution_data = ExecutionData {
1429                payload: payload_v1.into(),
1430                sidecar: ExecutionPayloadSidecar::none(),
1431            };
1432
1433            api.new_payload_v1(execution_data).await.unwrap();
1434        });
1435        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1436    }
1437
1438    #[derive(Clone)]
1439    struct TestNetworkInfo {
1440        syncing: bool,
1441    }
1442
1443    impl NetworkInfo for TestNetworkInfo {
1444        fn local_addr(&self) -> std::net::SocketAddr {
1445            (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1446        }
1447
1448        async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1449            #[allow(deprecated)]
1450            Ok(NetworkStatus {
1451                client_version: "test".to_string(),
1452                protocol_version: 5,
1453                eth_protocol_info: EthProtocolInfo {
1454                    network: 1,
1455                    difficulty: None,
1456                    genesis: Default::default(),
1457                    config: Default::default(),
1458                    head: Default::default(),
1459                },
1460                capabilities: vec![],
1461            })
1462        }
1463
1464        fn chain_id(&self) -> u64 {
1465            1
1466        }
1467
1468        fn is_syncing(&self) -> bool {
1469            self.syncing
1470        }
1471
1472        fn is_initially_syncing(&self) -> bool {
1473            self.syncing
1474        }
1475    }
1476
1477    #[tokio::test]
1478    async fn get_blobs_v3_returns_null_when_syncing() {
1479        let chain_spec: Arc<ChainSpec> =
1480            Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1481        let provider = Arc::new(MockEthProvider::default());
1482        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1483        let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1484
1485        let api = EngineApi::new(
1486            provider,
1487            chain_spec.clone(),
1488            ConsensusEngineHandle::new(to_engine),
1489            payload_store.into(),
1490            NoopTransactionPool::default(),
1491            Box::<TokioTaskExecutor>::default(),
1492            ClientVersionV1 {
1493                code: ClientCode::RH,
1494                name: "Reth".to_string(),
1495                version: "v0.0.0-test".to_string(),
1496                commit: "test".to_string(),
1497            },
1498            EngineCapabilities::default(),
1499            EthereumEngineValidator::new(chain_spec),
1500            false,
1501            TestNetworkInfo { syncing: true },
1502        );
1503
1504        let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1505        assert_matches!(res, Ok(None));
1506    }
1507
1508    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1509    mod get_payload_bodies {
1510        use super::*;
1511        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1512        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1513
1514        #[tokio::test]
1515        async fn invalid_params() {
1516            let (_, api) = setup_engine_api();
1517
1518            let by_range_tests = [
1519                // (start, count)
1520                (0, 0),
1521                (0, 1),
1522                (1, 0),
1523            ];
1524
1525            // test [EngineApiMessage::GetPayloadBodiesByRange]
1526            for (start, count) in by_range_tests {
1527                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1528                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1529            }
1530        }
1531
1532        #[tokio::test]
1533        async fn request_too_large() {
1534            let (_, api) = setup_engine_api();
1535
1536            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1537            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1538            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1539        }
1540
1541        #[tokio::test]
1542        async fn returns_payload_bodies() {
1543            let mut rng = generators::rng();
1544            let (handle, api) = setup_engine_api();
1545
1546            let (start, count) = (1, 10);
1547            let blocks = random_block_range(
1548                &mut rng,
1549                start..=start + count - 1,
1550                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1551            );
1552            handle
1553                .provider
1554                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1555
1556            let expected = blocks
1557                .iter()
1558                .cloned()
1559                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1560                .collect::<Vec<_>>();
1561
1562            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1563            assert_eq!(res, expected);
1564        }
1565
1566        #[tokio::test]
1567        async fn returns_payload_bodies_with_gaps() {
1568            let mut rng = generators::rng();
1569            let (handle, api) = setup_engine_api();
1570
1571            let (start, count) = (1, 100);
1572            let blocks = random_block_range(
1573                &mut rng,
1574                start..=start + count - 1,
1575                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1576            );
1577
1578            // Insert only blocks in ranges 1-25 and 50-75
1579            let first_missing_range = 26..=50;
1580            let second_missing_range = 76..=100;
1581            handle.provider.extend_blocks(
1582                blocks
1583                    .iter()
1584                    .filter(|b| {
1585                        !first_missing_range.contains(&b.number) &&
1586                            !second_missing_range.contains(&b.number)
1587                    })
1588                    .map(|b| (b.hash(), b.clone().into_block())),
1589            );
1590
1591            let expected = blocks
1592                .iter()
1593                // filter anything after the second missing range to ensure we don't expect trailing
1594                // `None`s
1595                .filter(|b| !second_missing_range.contains(&b.number))
1596                .cloned()
1597                .map(|b| {
1598                    if first_missing_range.contains(&b.number) {
1599                        None
1600                    } else {
1601                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1602                    }
1603                })
1604                .collect::<Vec<_>>();
1605
1606            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1607            assert_eq!(res, expected);
1608
1609            let expected = blocks
1610                .iter()
1611                .cloned()
1612                // ensure we still return trailing `None`s here because by-hash will not be aware
1613                // of the missing block's number, and cannot compare it to the current best block
1614                .map(|b| {
1615                    if first_missing_range.contains(&b.number) ||
1616                        second_missing_range.contains(&b.number)
1617                    {
1618                        None
1619                    } else {
1620                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1621                    }
1622                })
1623                .collect::<Vec<_>>();
1624
1625            let hashes = blocks.iter().map(|b| b.hash()).collect();
1626            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1627            assert_eq!(res, expected);
1628        }
1629    }
1630}