reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use reth_chainspec::EthereumHardforks;
20use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
21use reth_network_api::NetworkInfo;
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24    validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
25    PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{
33    sync::Arc,
34    time::{Instant, SystemTime},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace, warn};
38
39/// The Engine API response sender.
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
41
42/// The upper limit for payload bodies request.
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
44
45/// The upper limit for blobs in `engine_getBlobsVx`.
46const MAX_BLOB_LIMIT: usize = 128;
47
48/// The Engine API implementation that grants the Consensus layer access to data and
49/// functions in the Execution layer that are crucial for the consensus process.
50///
51/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
52/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
53/// are still follow the engine API model.
54///
55/// ## Implementers
56///
57/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
58/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
59/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
60/// endpoints (e.g. opstack).
61/// See also [`EngineApiServer`] implementation for this type which is the
62/// L1 implementation.
63pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
64    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
65}
66
67impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
68    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
69{
70    /// Returns the configured chainspec.
71    pub fn chain_spec(&self) -> &Arc<ChainSpec> {
72        &self.inner.chain_spec
73    }
74}
75
76impl<Provider, PayloadT, Pool, Validator, ChainSpec>
77    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
78where
79    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
80    PayloadT: PayloadTypes,
81    Pool: TransactionPool + 'static,
82    Validator: EngineApiValidator<PayloadT>,
83    ChainSpec: EthereumHardforks + Send + Sync + 'static,
84{
85    /// Create new instance of [`EngineApi`].
86    #[expect(clippy::too_many_arguments)]
87    pub fn new(
88        provider: Provider,
89        chain_spec: Arc<ChainSpec>,
90        beacon_consensus: ConsensusEngineHandle<PayloadT>,
91        payload_store: PayloadStore<PayloadT>,
92        tx_pool: Pool,
93        task_spawner: Box<dyn TaskSpawner>,
94        client: ClientVersionV1,
95        capabilities: EngineCapabilities,
96        validator: Validator,
97        accept_execution_requests_hash: bool,
98        network: impl NetworkInfo + 'static,
99    ) -> Self {
100        let is_syncing = Arc::new(move || network.is_syncing());
101        let inner = Arc::new(EngineApiInner {
102            provider,
103            chain_spec,
104            beacon_consensus,
105            payload_store,
106            task_spawner,
107            metrics: EngineApiMetrics::default(),
108            client,
109            capabilities,
110            tx_pool,
111            validator,
112            accept_execution_requests_hash,
113            is_syncing,
114        });
115        Self { inner }
116    }
117
118    /// Fetches the client version.
119    pub fn get_client_version_v1(
120        &self,
121        _client: ClientVersionV1,
122    ) -> EngineApiResult<Vec<ClientVersionV1>> {
123        Ok(vec![self.inner.client.clone()])
124    }
125
126    /// Fetches the timestamp of the payload with the given id.
127    async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
128        Ok(self
129            .inner
130            .payload_store
131            .payload_timestamp(payload_id)
132            .await
133            .ok_or(EngineApiError::UnknownPayload)??)
134    }
135
136    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
137    /// Caution: This should not accept the `withdrawals` field
138    pub async fn new_payload_v1(
139        &self,
140        payload: PayloadT::ExecutionData,
141    ) -> EngineApiResult<PayloadStatus> {
142        let payload_or_attrs = PayloadOrAttributes::<
143            '_,
144            PayloadT::ExecutionData,
145            PayloadT::PayloadAttributes,
146        >::from_execution_payload(&payload);
147
148        self.inner
149            .validator
150            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
151
152        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
153    }
154
155    /// Metered version of `new_payload_v1`.
156    pub async fn new_payload_v1_metered(
157        &self,
158        payload: PayloadT::ExecutionData,
159    ) -> EngineApiResult<PayloadStatus> {
160        let start = Instant::now();
161        let res = Self::new_payload_v1(self, payload).await;
162        let elapsed = start.elapsed();
163        self.inner.metrics.latency.new_payload_v1.record(elapsed);
164        res
165    }
166
167    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
168    pub async fn new_payload_v2(
169        &self,
170        payload: PayloadT::ExecutionData,
171    ) -> EngineApiResult<PayloadStatus> {
172        let payload_or_attrs = PayloadOrAttributes::<
173            '_,
174            PayloadT::ExecutionData,
175            PayloadT::PayloadAttributes,
176        >::from_execution_payload(&payload);
177        self.inner
178            .validator
179            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
180        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
181    }
182
183    /// Metered version of `new_payload_v2`.
184    pub async fn new_payload_v2_metered(
185        &self,
186        payload: PayloadT::ExecutionData,
187    ) -> EngineApiResult<PayloadStatus> {
188        let start = Instant::now();
189        let res = Self::new_payload_v2(self, payload).await;
190        let elapsed = start.elapsed();
191        self.inner.metrics.latency.new_payload_v2.record(elapsed);
192        res
193    }
194
195    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
196    pub async fn new_payload_v3(
197        &self,
198        payload: PayloadT::ExecutionData,
199    ) -> EngineApiResult<PayloadStatus> {
200        let payload_or_attrs = PayloadOrAttributes::<
201            '_,
202            PayloadT::ExecutionData,
203            PayloadT::PayloadAttributes,
204        >::from_execution_payload(&payload);
205        self.inner
206            .validator
207            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
208
209        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
210    }
211
212    /// Metrics version of `new_payload_v3`
213    pub async fn new_payload_v3_metered(
214        &self,
215        payload: PayloadT::ExecutionData,
216    ) -> RpcResult<PayloadStatus> {
217        let start = Instant::now();
218
219        let res = Self::new_payload_v3(self, payload).await;
220        let elapsed = start.elapsed();
221        self.inner.metrics.latency.new_payload_v3.record(elapsed);
222        Ok(res?)
223    }
224
225    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
226    pub async fn new_payload_v4(
227        &self,
228        payload: PayloadT::ExecutionData,
229    ) -> EngineApiResult<PayloadStatus> {
230        let payload_or_attrs = PayloadOrAttributes::<
231            '_,
232            PayloadT::ExecutionData,
233            PayloadT::PayloadAttributes,
234        >::from_execution_payload(&payload);
235        self.inner
236            .validator
237            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
238
239        Ok(self.inner.beacon_consensus.new_payload(payload).await?)
240    }
241
242    /// Metrics version of `new_payload_v4`
243    pub async fn new_payload_v4_metered(
244        &self,
245        payload: PayloadT::ExecutionData,
246    ) -> RpcResult<PayloadStatus> {
247        let start = Instant::now();
248        let res = Self::new_payload_v4(self, payload).await;
249
250        let elapsed = start.elapsed();
251        self.inner.metrics.latency.new_payload_v4.record(elapsed);
252        Ok(res?)
253    }
254
255    /// Returns whether the engine accepts execution requests hash.
256    pub fn accept_execution_requests_hash(&self) -> bool {
257        self.inner.accept_execution_requests_hash
258    }
259}
260
261impl<Provider, EngineT, Pool, Validator, ChainSpec>
262    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
263where
264    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
265    EngineT: EngineTypes,
266    Pool: TransactionPool + 'static,
267    Validator: EngineApiValidator<EngineT>,
268    ChainSpec: EthereumHardforks + Send + Sync + 'static,
269{
270    /// Sends a message to the beacon consensus engine to update the fork choice _without_
271    /// withdrawals.
272    ///
273    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
274    ///
275    /// Caution: This should not accept the `withdrawals` field
276    pub async fn fork_choice_updated_v1(
277        &self,
278        state: ForkchoiceState,
279        payload_attrs: Option<EngineT::PayloadAttributes>,
280    ) -> EngineApiResult<ForkchoiceUpdated> {
281        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
282            .await
283    }
284
285    /// Metrics version of `fork_choice_updated_v1`
286    pub async fn fork_choice_updated_v1_metered(
287        &self,
288        state: ForkchoiceState,
289        payload_attrs: Option<EngineT::PayloadAttributes>,
290    ) -> EngineApiResult<ForkchoiceUpdated> {
291        let start = Instant::now();
292        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
293        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
294        res
295    }
296
297    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
298    /// but only _after_ shanghai.
299    ///
300    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
301    pub async fn fork_choice_updated_v2(
302        &self,
303        state: ForkchoiceState,
304        payload_attrs: Option<EngineT::PayloadAttributes>,
305    ) -> EngineApiResult<ForkchoiceUpdated> {
306        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
307            .await
308    }
309
310    /// Metrics version of `fork_choice_updated_v2`
311    pub async fn fork_choice_updated_v2_metered(
312        &self,
313        state: ForkchoiceState,
314        payload_attrs: Option<EngineT::PayloadAttributes>,
315    ) -> EngineApiResult<ForkchoiceUpdated> {
316        let start = Instant::now();
317        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
318        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
319        res
320    }
321
322    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
323    /// but only _after_ cancun.
324    ///
325    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
326    pub async fn fork_choice_updated_v3(
327        &self,
328        state: ForkchoiceState,
329        payload_attrs: Option<EngineT::PayloadAttributes>,
330    ) -> EngineApiResult<ForkchoiceUpdated> {
331        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
332            .await
333    }
334
335    /// Metrics version of `fork_choice_updated_v3`
336    pub async fn fork_choice_updated_v3_metered(
337        &self,
338        state: ForkchoiceState,
339        payload_attrs: Option<EngineT::PayloadAttributes>,
340    ) -> EngineApiResult<ForkchoiceUpdated> {
341        let start = Instant::now();
342        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
343        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
344        res
345    }
346
347    /// Helper function for retrieving the build payload by id.
348    async fn get_built_payload(
349        &self,
350        payload_id: PayloadId,
351    ) -> EngineApiResult<EngineT::BuiltPayload> {
352        self.inner
353            .payload_store
354            .resolve(payload_id)
355            .await
356            .ok_or(EngineApiError::UnknownPayload)?
357            .map_err(|_| EngineApiError::UnknownPayload)
358    }
359
360    /// Helper function for validating the payload timestamp and retrieving & converting the payload
361    /// into desired envelope.
362    async fn get_payload_inner<R>(
363        &self,
364        payload_id: PayloadId,
365        version: EngineApiMessageVersion,
366    ) -> EngineApiResult<R>
367    where
368        EngineT::BuiltPayload: TryInto<R>,
369    {
370        // Validate timestamp according to engine rules
371        // Enforces Osaka restrictions on `getPayloadV4`.
372        let timestamp = self.get_payload_timestamp(payload_id).await?;
373        validate_payload_timestamp(
374            &self.inner.chain_spec,
375            version,
376            timestamp,
377            MessageValidationKind::GetPayload,
378        )?;
379
380        // Now resolve the payload
381        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
382            warn!(?version, "could not transform built payload");
383            EngineApiError::UnknownPayload
384        })
385    }
386
387    /// Returns the most recent version of the payload that is available in the corresponding
388    /// payload build process at the time of receiving this call.
389    ///
390    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
391    ///
392    /// Caution: This should not return the `withdrawals` field
393    ///
394    /// Note:
395    /// > Provider software MAY stop the corresponding build process after serving this call.
396    pub async fn get_payload_v1(
397        &self,
398        payload_id: PayloadId,
399    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
400        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
401            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
402            EngineApiError::UnknownPayload
403        })
404    }
405
406    /// Metrics version of `get_payload_v1`
407    pub async fn get_payload_v1_metered(
408        &self,
409        payload_id: PayloadId,
410    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
411        let start = Instant::now();
412        let res = Self::get_payload_v1(self, payload_id).await;
413        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
414        res
415    }
416
417    /// Returns the most recent version of the payload that is available in the corresponding
418    /// payload build process at the time of receiving this call.
419    ///
420    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
421    ///
422    /// Note:
423    /// > Provider software MAY stop the corresponding build process after serving this call.
424    pub async fn get_payload_v2(
425        &self,
426        payload_id: PayloadId,
427    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
428        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
429    }
430
431    /// Metrics version of `get_payload_v2`
432    pub async fn get_payload_v2_metered(
433        &self,
434        payload_id: PayloadId,
435    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
436        let start = Instant::now();
437        let res = Self::get_payload_v2(self, payload_id).await;
438        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
439        res
440    }
441
442    /// Returns the most recent version of the payload that is available in the corresponding
443    /// payload build process at the time of receiving this call.
444    ///
445    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
446    ///
447    /// Note:
448    /// > Provider software MAY stop the corresponding build process after serving this call.
449    pub async fn get_payload_v3(
450        &self,
451        payload_id: PayloadId,
452    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
453        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
454    }
455
456    /// Metrics version of `get_payload_v3`
457    pub async fn get_payload_v3_metered(
458        &self,
459        payload_id: PayloadId,
460    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
461        let start = Instant::now();
462        let res = Self::get_payload_v3(self, payload_id).await;
463        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
464        res
465    }
466
467    /// Returns the most recent version of the payload that is available in the corresponding
468    /// payload build process at the time of receiving this call.
469    ///
470    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_getpayloadv4>
471    ///
472    /// Note:
473    /// > Provider software MAY stop the corresponding build process after serving this call.
474    pub async fn get_payload_v4(
475        &self,
476        payload_id: PayloadId,
477    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
478        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
479    }
480
481    /// Metrics version of `get_payload_v4`
482    pub async fn get_payload_v4_metered(
483        &self,
484        payload_id: PayloadId,
485    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
486        let start = Instant::now();
487        let res = Self::get_payload_v4(self, payload_id).await;
488        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
489        res
490    }
491
492    /// Handler for `engine_getPayloadV5`
493    ///
494    /// Returns the most recent version of the payload that is available in the corresponding
495    /// payload build process at the time of receiving this call.
496    ///
497    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
498    ///
499    /// Note:
500    /// > Provider software MAY stop the corresponding build process after serving this call.
501    pub async fn get_payload_v5(
502        &self,
503        payload_id: PayloadId,
504    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
505        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
506    }
507
508    /// Metrics version of `get_payload_v5`
509    pub async fn get_payload_v5_metered(
510        &self,
511        payload_id: PayloadId,
512    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
513        let start = Instant::now();
514        let res = Self::get_payload_v5(self, payload_id).await;
515        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
516        res
517    }
518
519    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
520    /// blocks and returns the mapped payload bodies.
521    pub async fn get_payload_bodies_by_range_with<F, R>(
522        &self,
523        start: BlockNumber,
524        count: u64,
525        f: F,
526    ) -> EngineApiResult<Vec<Option<R>>>
527    where
528        F: Fn(Provider::Block) -> R + Send + 'static,
529        R: Send + 'static,
530    {
531        let (tx, rx) = oneshot::channel();
532        let inner = self.inner.clone();
533
534        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
535            if count > MAX_PAYLOAD_BODIES_LIMIT {
536                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
537                return;
538            }
539
540            if start == 0 || count == 0 {
541                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
542                return;
543            }
544
545            let mut result = Vec::with_capacity(count as usize);
546
547            // -1 so range is inclusive
548            let mut end = start.saturating_add(count - 1);
549
550            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
551            // truncate the end if it's greater than the last block
552            if let Ok(best_block) = inner.provider.best_block_number()
553                && end > best_block {
554                    end = best_block;
555                }
556
557            // Check if the requested range starts before the earliest available block due to pruning/expiry
558            let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
559            for num in start..=end {
560                if num < earliest_block {
561                    result.push(None);
562                    continue;
563                }
564                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
565                match block_result {
566                    Ok(block) => {
567                        result.push(block.map(&f));
568                    }
569                    Err(err) => {
570                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
571                        return;
572                    }
573                };
574            }
575            tx.send(Ok(result)).ok();
576        }));
577
578        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
579    }
580
581    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
582    /// blocks.
583    ///
584    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
585    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
586    /// adversarial.
587    ///
588    /// Implementers should take care when acting on the input to this method, specifically
589    /// ensuring that the range is limited properly, and that the range boundaries are computed
590    /// correctly and without panics.
591    pub async fn get_payload_bodies_by_range_v1(
592        &self,
593        start: BlockNumber,
594        count: u64,
595    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
596        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
597            transactions: block.body().encoded_2718_transactions(),
598            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
599        })
600        .await
601    }
602
603    /// Metrics version of `get_payload_bodies_by_range_v1`
604    pub async fn get_payload_bodies_by_range_v1_metered(
605        &self,
606        start: BlockNumber,
607        count: u64,
608    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
609        let start_time = Instant::now();
610        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
611        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
612        res
613    }
614
615    /// Called to retrieve execution payload bodies by hashes.
616    pub async fn get_payload_bodies_by_hash_with<F, R>(
617        &self,
618        hashes: Vec<BlockHash>,
619        f: F,
620    ) -> EngineApiResult<Vec<Option<R>>>
621    where
622        F: Fn(Provider::Block) -> R + Send + 'static,
623        R: Send + 'static,
624    {
625        let len = hashes.len() as u64;
626        if len > MAX_PAYLOAD_BODIES_LIMIT {
627            return Err(EngineApiError::PayloadRequestTooLarge { len });
628        }
629
630        let (tx, rx) = oneshot::channel();
631        let inner = self.inner.clone();
632
633        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
634            let mut result = Vec::with_capacity(hashes.len());
635            for hash in hashes {
636                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
637                match block_result {
638                    Ok(block) => {
639                        result.push(block.map(&f));
640                    }
641                    Err(err) => {
642                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
643                        return;
644                    }
645                }
646            }
647            tx.send(Ok(result)).ok();
648        }));
649
650        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
651    }
652
653    /// Called to retrieve execution payload bodies by hashes.
654    pub async fn get_payload_bodies_by_hash_v1(
655        &self,
656        hashes: Vec<BlockHash>,
657    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
658        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
659            transactions: block.body().encoded_2718_transactions(),
660            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
661        })
662        .await
663    }
664
665    /// Metrics version of `get_payload_bodies_by_hash_v1`
666    pub async fn get_payload_bodies_by_hash_v1_metered(
667        &self,
668        hashes: Vec<BlockHash>,
669    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
670        let start = Instant::now();
671        let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
672        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
673        res
674    }
675
676    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
677    /// update.
678    ///
679    /// The payload attributes will be validated according to the engine API rules for the given
680    /// message version:
681    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
682    ///   validated according to the Paris rules.
683    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
684    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
685    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
686    ///
687    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
688    ///   validated according to the Cancun rules.
689    async fn validate_and_execute_forkchoice(
690        &self,
691        version: EngineApiMessageVersion,
692        state: ForkchoiceState,
693        payload_attrs: Option<EngineT::PayloadAttributes>,
694    ) -> EngineApiResult<ForkchoiceUpdated> {
695        if let Some(ref attrs) = payload_attrs {
696            let attr_validation_res =
697                self.inner.validator.ensure_well_formed_attributes(version, attrs);
698
699            // From the engine API spec:
700            //
701            // Client software MUST ensure that payloadAttributes.timestamp is greater than
702            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
703            // isn't held client software MUST respond with -38003: Invalid payload attributes and
704            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
705            // update MUST NOT be rolled back.
706            //
707            // NOTE: This will also apply to the validation result for the cancun or
708            // shanghai-specific fields provided in the payload attributes.
709            //
710            // To do this, we set the payload attrs to `None` if attribute validation failed, but
711            // we still apply the forkchoice update.
712            if let Err(err) = attr_validation_res {
713                let fcu_res =
714                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
715                // TODO: decide if we want this branch - the FCU INVALID response might be more
716                // useful than the payload attributes INVALID response
717                if fcu_res.is_invalid() {
718                    return Ok(fcu_res)
719                }
720                return Err(err.into())
721            }
722        }
723
724        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
725    }
726
727    /// Returns reference to supported capabilities.
728    pub fn capabilities(&self) -> &EngineCapabilities {
729        &self.inner.capabilities
730    }
731
732    fn get_blobs_v1(
733        &self,
734        versioned_hashes: Vec<B256>,
735    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
736        // Only allow this method before Osaka fork
737        let current_timestamp =
738            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
739        if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
740            return Err(EngineApiError::EngineObjectValidationError(
741                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
742            ));
743        }
744
745        if versioned_hashes.len() > MAX_BLOB_LIMIT {
746            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
747        }
748
749        self.inner
750            .tx_pool
751            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
752            .map_err(|err| EngineApiError::Internal(Box::new(err)))
753    }
754
755    /// Metered version of `get_blobs_v1`.
756    pub fn get_blobs_v1_metered(
757        &self,
758        versioned_hashes: Vec<B256>,
759    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
760        let hashes_len = versioned_hashes.len();
761        let start = Instant::now();
762        let res = Self::get_blobs_v1(self, versioned_hashes);
763        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
764
765        if let Ok(blobs) = &res {
766            let blobs_found = blobs.iter().flatten().count();
767            let blobs_missed = hashes_len - blobs_found;
768
769            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
770            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
771        }
772
773        res
774    }
775
776    fn get_blobs_v2(
777        &self,
778        versioned_hashes: Vec<B256>,
779    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
780        // Check if Osaka fork is active
781        let current_timestamp =
782            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
783        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
784            return Err(EngineApiError::EngineObjectValidationError(
785                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
786            ));
787        }
788
789        if versioned_hashes.len() > MAX_BLOB_LIMIT {
790            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
791        }
792
793        self.inner
794            .tx_pool
795            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
796            .map_err(|err| EngineApiError::Internal(Box::new(err)))
797    }
798
799    fn get_blobs_v3(
800        &self,
801        versioned_hashes: Vec<B256>,
802    ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
803        // Check if Osaka fork is active
804        let current_timestamp =
805            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
806        if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
807            return Err(EngineApiError::EngineObjectValidationError(
808                reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
809            ));
810        }
811
812        if versioned_hashes.len() > MAX_BLOB_LIMIT {
813            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
814        }
815
816        // Spec requires returning `null` if syncing.
817        if (*self.inner.is_syncing)() {
818            return Ok(None)
819        }
820
821        self.inner
822            .tx_pool
823            .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
824            .map(Some)
825            .map_err(|err| EngineApiError::Internal(Box::new(err)))
826    }
827
828    /// Metered version of `get_blobs_v2`.
829    pub fn get_blobs_v2_metered(
830        &self,
831        versioned_hashes: Vec<B256>,
832    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
833        let hashes_len = versioned_hashes.len();
834        let start = Instant::now();
835        let res = Self::get_blobs_v2(self, versioned_hashes);
836        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
837
838        if let Ok(blobs) = &res {
839            let blobs_found = blobs.iter().flatten().count();
840
841            self.inner
842                .metrics
843                .blob_metrics
844                .get_blobs_requests_blobs_total
845                .increment(hashes_len as u64);
846            self.inner
847                .metrics
848                .blob_metrics
849                .get_blobs_requests_blobs_in_blobpool_total
850                .increment(blobs_found as u64);
851
852            if blobs_found == hashes_len {
853                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
854            } else {
855                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
856            }
857        } else {
858            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
859        }
860
861        res
862    }
863
864    /// Metered version of `get_blobs_v3`.
865    pub fn get_blobs_v3_metered(
866        &self,
867        versioned_hashes: Vec<B256>,
868    ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
869        let hashes_len = versioned_hashes.len();
870        let start = Instant::now();
871        let res = Self::get_blobs_v3(self, versioned_hashes);
872        self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
873
874        if let Ok(Some(blobs)) = &res {
875            let blobs_found = blobs.iter().flatten().count();
876            let blobs_missed = hashes_len - blobs_found;
877
878            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
879            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
880        }
881
882        res
883    }
884}
885
886// This is the concrete ethereum engine API implementation.
887#[async_trait]
888impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
889    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
890where
891    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
892    EngineT: EngineTypes<ExecutionData = ExecutionData>,
893    Pool: TransactionPool + 'static,
894    Validator: EngineApiValidator<EngineT>,
895    ChainSpec: EthereumHardforks + Send + Sync + 'static,
896{
897    /// Handler for `engine_newPayloadV1`
898    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
899    /// Caution: This should not accept the `withdrawals` field
900    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
901        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
902        let payload =
903            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
904        Ok(self.new_payload_v1_metered(payload).await?)
905    }
906
907    /// Handler for `engine_newPayloadV2`
908    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
909    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
910        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
911        let payload = ExecutionData {
912            payload: payload.into_payload(),
913            sidecar: ExecutionPayloadSidecar::none(),
914        };
915
916        Ok(self.new_payload_v2_metered(payload).await?)
917    }
918
919    /// Handler for `engine_newPayloadV3`
920    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
921    async fn new_payload_v3(
922        &self,
923        payload: ExecutionPayloadV3,
924        versioned_hashes: Vec<B256>,
925        parent_beacon_block_root: B256,
926    ) -> RpcResult<PayloadStatus> {
927        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
928        let payload = ExecutionData {
929            payload: payload.into(),
930            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
931                versioned_hashes,
932                parent_beacon_block_root,
933            }),
934        };
935
936        Ok(self.new_payload_v3_metered(payload).await?)
937    }
938
939    /// Handler for `engine_newPayloadV4`
940    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
941    async fn new_payload_v4(
942        &self,
943        payload: ExecutionPayloadV3,
944        versioned_hashes: Vec<B256>,
945        parent_beacon_block_root: B256,
946        requests: RequestsOrHash,
947    ) -> RpcResult<PayloadStatus> {
948        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
949
950        // Accept requests as a hash only if it is explicitly allowed
951        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
952            return Err(EngineApiError::UnexpectedRequestsHash.into());
953        }
954
955        let payload = ExecutionData {
956            payload: payload.into(),
957            sidecar: ExecutionPayloadSidecar::v4(
958                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
959                PraguePayloadFields { requests },
960            ),
961        };
962
963        Ok(self.new_payload_v4_metered(payload).await?)
964    }
965
966    /// Handler for `engine_forkchoiceUpdatedV1`
967    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
968    ///
969    /// Caution: This should not accept the `withdrawals` field
970    async fn fork_choice_updated_v1(
971        &self,
972        fork_choice_state: ForkchoiceState,
973        payload_attributes: Option<EngineT::PayloadAttributes>,
974    ) -> RpcResult<ForkchoiceUpdated> {
975        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
976        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
977    }
978
979    /// Handler for `engine_forkchoiceUpdatedV2`
980    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
981    async fn fork_choice_updated_v2(
982        &self,
983        fork_choice_state: ForkchoiceState,
984        payload_attributes: Option<EngineT::PayloadAttributes>,
985    ) -> RpcResult<ForkchoiceUpdated> {
986        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
987        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
988    }
989
990    /// Handler for `engine_forkchoiceUpdatedV3`
991    ///
992    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
993    async fn fork_choice_updated_v3(
994        &self,
995        fork_choice_state: ForkchoiceState,
996        payload_attributes: Option<EngineT::PayloadAttributes>,
997    ) -> RpcResult<ForkchoiceUpdated> {
998        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
999        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1000    }
1001
1002    /// Handler for `engine_getPayloadV1`
1003    ///
1004    /// Returns the most recent version of the payload that is available in the corresponding
1005    /// payload build process at the time of receiving this call.
1006    ///
1007    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
1008    ///
1009    /// Caution: This should not return the `withdrawals` field
1010    ///
1011    /// Note:
1012    /// > Provider software MAY stop the corresponding build process after serving this call.
1013    async fn get_payload_v1(
1014        &self,
1015        payload_id: PayloadId,
1016    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1017        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1018        Ok(self.get_payload_v1_metered(payload_id).await?)
1019    }
1020
1021    /// Handler for `engine_getPayloadV2`
1022    ///
1023    /// Returns the most recent version of the payload that is available in the corresponding
1024    /// payload build process at the time of receiving this call.
1025    ///
1026    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
1027    ///
1028    /// Note:
1029    /// > Provider software MAY stop the corresponding build process after serving this call.
1030    async fn get_payload_v2(
1031        &self,
1032        payload_id: PayloadId,
1033    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1034        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1035        Ok(self.get_payload_v2_metered(payload_id).await?)
1036    }
1037
1038    /// Handler for `engine_getPayloadV3`
1039    ///
1040    /// Returns the most recent version of the payload that is available in the corresponding
1041    /// payload build process at the time of receiving this call.
1042    ///
1043    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
1044    ///
1045    /// Note:
1046    /// > Provider software MAY stop the corresponding build process after serving this call.
1047    async fn get_payload_v3(
1048        &self,
1049        payload_id: PayloadId,
1050    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1051        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1052        Ok(self.get_payload_v3_metered(payload_id).await?)
1053    }
1054
1055    /// Handler for `engine_getPayloadV4`
1056    ///
1057    /// Returns the most recent version of the payload that is available in the corresponding
1058    /// payload build process at the time of receiving this call.
1059    ///
1060    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1061    ///
1062    /// Note:
1063    /// > Provider software MAY stop the corresponding build process after serving this call.
1064    async fn get_payload_v4(
1065        &self,
1066        payload_id: PayloadId,
1067    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1068        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1069        Ok(self.get_payload_v4_metered(payload_id).await?)
1070    }
1071
1072    /// Handler for `engine_getPayloadV5`
1073    ///
1074    /// Returns the most recent version of the payload that is available in the corresponding
1075    /// payload build process at the time of receiving this call.
1076    ///
1077    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1078    ///
1079    /// Note:
1080    /// > Provider software MAY stop the corresponding build process after serving this call.
1081    async fn get_payload_v5(
1082        &self,
1083        payload_id: PayloadId,
1084    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1085        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1086        Ok(self.get_payload_v5_metered(payload_id).await?)
1087    }
1088
1089    /// Handler for `engine_getPayloadBodiesByHashV1`
1090    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1091    async fn get_payload_bodies_by_hash_v1(
1092        &self,
1093        block_hashes: Vec<BlockHash>,
1094    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1095        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1096        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1097    }
1098
1099    /// Handler for `engine_getPayloadBodiesByRangeV1`
1100    ///
1101    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1102    ///
1103    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1104    /// blocks.
1105    ///
1106    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
1107    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1108    /// adversarial.
1109    ///
1110    /// Implementers should take care when acting on the input to this method, specifically
1111    /// ensuring that the range is limited properly, and that the range boundaries are computed
1112    /// correctly and without panics.
1113    ///
1114    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1115    async fn get_payload_bodies_by_range_v1(
1116        &self,
1117        start: U64,
1118        count: U64,
1119    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1120        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1121        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1122    }
1123
1124    /// Handler for `engine_getClientVersionV1`
1125    ///
1126    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1127    async fn get_client_version_v1(
1128        &self,
1129        client: ClientVersionV1,
1130    ) -> RpcResult<Vec<ClientVersionV1>> {
1131        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1132        Ok(Self::get_client_version_v1(self, client)?)
1133    }
1134
1135    /// Handler for `engine_exchangeCapabilitiesV1`
1136    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1137    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1138        Ok(self.capabilities().list())
1139    }
1140
1141    async fn get_blobs_v1(
1142        &self,
1143        versioned_hashes: Vec<B256>,
1144    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1145        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1146        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1147    }
1148
1149    async fn get_blobs_v2(
1150        &self,
1151        versioned_hashes: Vec<B256>,
1152    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1153        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1154        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1155    }
1156
1157    async fn get_blobs_v3(
1158        &self,
1159        versioned_hashes: Vec<B256>,
1160    ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1161        trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1162        Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1163    }
1164}
1165
1166impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1167    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1168where
1169    EngineT: EngineTypes,
1170    Self: EngineApiServer<EngineT>,
1171{
1172    fn into_rpc_module(self) -> RpcModule<()> {
1173        self.into_rpc().remove_context()
1174    }
1175}
1176
1177impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1178    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1179where
1180    PayloadT: PayloadTypes,
1181{
1182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1183        f.debug_struct("EngineApi").finish_non_exhaustive()
1184    }
1185}
1186
1187impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1188    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1189where
1190    PayloadT: PayloadTypes,
1191{
1192    fn clone(&self) -> Self {
1193        Self { inner: Arc::clone(&self.inner) }
1194    }
1195}
1196
1197/// The container type for the engine API internals.
1198struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1199    /// The provider to interact with the chain.
1200    provider: Provider,
1201    /// Consensus configuration
1202    chain_spec: Arc<ChainSpec>,
1203    /// The channel to send messages to the beacon consensus engine.
1204    beacon_consensus: ConsensusEngineHandle<PayloadT>,
1205    /// The type that can communicate with the payload service to retrieve payloads.
1206    payload_store: PayloadStore<PayloadT>,
1207    /// For spawning and executing async tasks
1208    task_spawner: Box<dyn TaskSpawner>,
1209    /// The latency and response type metrics for engine api calls
1210    metrics: EngineApiMetrics,
1211    /// Identification of the execution client used by the consensus client
1212    client: ClientVersionV1,
1213    /// The list of all supported Engine capabilities available over the engine endpoint.
1214    capabilities: EngineCapabilities,
1215    /// Transaction pool.
1216    tx_pool: Pool,
1217    /// Engine validator.
1218    validator: Validator,
1219    accept_execution_requests_hash: bool,
1220    /// Returns `true` if the node is currently syncing.
1221    is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1222}
1223
1224#[cfg(test)]
1225mod tests {
1226    use super::*;
1227    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1228    use assert_matches::assert_matches;
1229    use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1230    use reth_engine_primitives::BeaconEngineMessage;
1231    use reth_ethereum_engine_primitives::EthEngineTypes;
1232    use reth_ethereum_primitives::Block;
1233    use reth_network_api::{
1234        noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1235    };
1236    use reth_node_ethereum::EthereumEngineValidator;
1237    use reth_payload_builder::test_utils::spawn_test_payload_service;
1238    use reth_provider::test_utils::MockEthProvider;
1239    use reth_tasks::TokioTaskExecutor;
1240    use reth_transaction_pool::noop::NoopTransactionPool;
1241    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1242
1243    fn setup_engine_api() -> (
1244        EngineApiTestHandle,
1245        EngineApi<
1246            Arc<MockEthProvider>,
1247            EthEngineTypes,
1248            NoopTransactionPool,
1249            EthereumEngineValidator,
1250            ChainSpec,
1251        >,
1252    ) {
1253        let client = ClientVersionV1 {
1254            code: ClientCode::RH,
1255            name: "Reth".to_string(),
1256            version: "v0.2.0-beta.5".to_string(),
1257            commit: "defa64b2".to_string(),
1258        };
1259
1260        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1261        let provider = Arc::new(MockEthProvider::default());
1262        let payload_store = spawn_test_payload_service();
1263        let (to_engine, engine_rx) = unbounded_channel();
1264        let task_executor = Box::<TokioTaskExecutor>::default();
1265        let api = EngineApi::new(
1266            provider.clone(),
1267            chain_spec.clone(),
1268            ConsensusEngineHandle::new(to_engine),
1269            payload_store.into(),
1270            NoopTransactionPool::default(),
1271            task_executor,
1272            client,
1273            EngineCapabilities::default(),
1274            EthereumEngineValidator::new(chain_spec.clone()),
1275            false,
1276            NoopNetwork::default(),
1277        );
1278        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1279        (handle, api)
1280    }
1281
1282    #[tokio::test]
1283    async fn engine_client_version_v1() {
1284        let client = ClientVersionV1 {
1285            code: ClientCode::RH,
1286            name: "Reth".to_string(),
1287            version: "v0.2.0-beta.5".to_string(),
1288            commit: "defa64b2".to_string(),
1289        };
1290        let (_, api) = setup_engine_api();
1291        let res = api.get_client_version_v1(client.clone());
1292        assert_eq!(res.unwrap(), vec![client]);
1293    }
1294
1295    struct EngineApiTestHandle {
1296        #[allow(dead_code)]
1297        chain_spec: Arc<ChainSpec>,
1298        provider: Arc<MockEthProvider>,
1299        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1300    }
1301
1302    #[tokio::test]
1303    async fn forwards_responses_to_consensus_engine() {
1304        let (mut handle, api) = setup_engine_api();
1305
1306        tokio::spawn(async move {
1307            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1308            let execution_data = ExecutionData {
1309                payload: payload_v1.into(),
1310                sidecar: ExecutionPayloadSidecar::none(),
1311            };
1312
1313            api.new_payload_v1(execution_data).await.unwrap();
1314        });
1315        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1316    }
1317
1318    #[derive(Clone)]
1319    struct TestNetworkInfo {
1320        syncing: bool,
1321    }
1322
1323    impl NetworkInfo for TestNetworkInfo {
1324        fn local_addr(&self) -> std::net::SocketAddr {
1325            (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1326        }
1327
1328        async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1329            #[allow(deprecated)]
1330            Ok(NetworkStatus {
1331                client_version: "test".to_string(),
1332                protocol_version: 5,
1333                eth_protocol_info: EthProtocolInfo {
1334                    network: 1,
1335                    difficulty: None,
1336                    genesis: Default::default(),
1337                    config: Default::default(),
1338                    head: Default::default(),
1339                },
1340                capabilities: vec![],
1341            })
1342        }
1343
1344        fn chain_id(&self) -> u64 {
1345            1
1346        }
1347
1348        fn is_syncing(&self) -> bool {
1349            self.syncing
1350        }
1351
1352        fn is_initially_syncing(&self) -> bool {
1353            self.syncing
1354        }
1355    }
1356
1357    #[tokio::test]
1358    async fn get_blobs_v3_returns_null_when_syncing() {
1359        let chain_spec: Arc<ChainSpec> =
1360            Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1361        let provider = Arc::new(MockEthProvider::default());
1362        let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1363        let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1364
1365        let api = EngineApi::new(
1366            provider,
1367            chain_spec.clone(),
1368            ConsensusEngineHandle::new(to_engine),
1369            payload_store.into(),
1370            NoopTransactionPool::default(),
1371            Box::<TokioTaskExecutor>::default(),
1372            ClientVersionV1 {
1373                code: ClientCode::RH,
1374                name: "Reth".to_string(),
1375                version: "v0.0.0-test".to_string(),
1376                commit: "test".to_string(),
1377            },
1378            EngineCapabilities::default(),
1379            EthereumEngineValidator::new(chain_spec),
1380            false,
1381            TestNetworkInfo { syncing: true },
1382        );
1383
1384        let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1385        assert_matches!(res, Ok(None));
1386    }
1387
1388    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1389    mod get_payload_bodies {
1390        use super::*;
1391        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1392        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1393
1394        #[tokio::test]
1395        async fn invalid_params() {
1396            let (_, api) = setup_engine_api();
1397
1398            let by_range_tests = [
1399                // (start, count)
1400                (0, 0),
1401                (0, 1),
1402                (1, 0),
1403            ];
1404
1405            // test [EngineApiMessage::GetPayloadBodiesByRange]
1406            for (start, count) in by_range_tests {
1407                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1408                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1409            }
1410        }
1411
1412        #[tokio::test]
1413        async fn request_too_large() {
1414            let (_, api) = setup_engine_api();
1415
1416            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1417            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1418            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1419        }
1420
1421        #[tokio::test]
1422        async fn returns_payload_bodies() {
1423            let mut rng = generators::rng();
1424            let (handle, api) = setup_engine_api();
1425
1426            let (start, count) = (1, 10);
1427            let blocks = random_block_range(
1428                &mut rng,
1429                start..=start + count - 1,
1430                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1431            );
1432            handle
1433                .provider
1434                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1435
1436            let expected = blocks
1437                .iter()
1438                .cloned()
1439                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1440                .collect::<Vec<_>>();
1441
1442            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1443            assert_eq!(res, expected);
1444        }
1445
1446        #[tokio::test]
1447        async fn returns_payload_bodies_with_gaps() {
1448            let mut rng = generators::rng();
1449            let (handle, api) = setup_engine_api();
1450
1451            let (start, count) = (1, 100);
1452            let blocks = random_block_range(
1453                &mut rng,
1454                start..=start + count - 1,
1455                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1456            );
1457
1458            // Insert only blocks in ranges 1-25 and 50-75
1459            let first_missing_range = 26..=50;
1460            let second_missing_range = 76..=100;
1461            handle.provider.extend_blocks(
1462                blocks
1463                    .iter()
1464                    .filter(|b| {
1465                        !first_missing_range.contains(&b.number) &&
1466                            !second_missing_range.contains(&b.number)
1467                    })
1468                    .map(|b| (b.hash(), b.clone().into_block())),
1469            );
1470
1471            let expected = blocks
1472                .iter()
1473                // filter anything after the second missing range to ensure we don't expect trailing
1474                // `None`s
1475                .filter(|b| !second_missing_range.contains(&b.number))
1476                .cloned()
1477                .map(|b| {
1478                    if first_missing_range.contains(&b.number) {
1479                        None
1480                    } else {
1481                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1482                    }
1483                })
1484                .collect::<Vec<_>>();
1485
1486            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1487            assert_eq!(res, expected);
1488
1489            let expected = blocks
1490                .iter()
1491                .cloned()
1492                // ensure we still return trailing `None`s here because by-hash will not be aware
1493                // of the missing block's number, and cannot compare it to the current best block
1494                .map(|b| {
1495                    if first_missing_range.contains(&b.number) ||
1496                        second_missing_range.contains(&b.number)
1497                    {
1498                        None
1499                    } else {
1500                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1501                    }
1502                })
1503                .collect::<Vec<_>>();
1504
1505            let hashes = blocks.iter().map(|b| b.hash()).collect();
1506            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1507            assert_eq!(res, expected);
1508        }
1509    }
1510}