1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes, EngineValidator};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24 validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload,
25 PayloadBuilderAttributes, PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_rpc_server_types::result::internal_rpc_err;
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::TaskSpawner;
32use reth_transaction_pool::TransactionPool;
33use std::{sync::Arc, time::Instant};
34use tokio::sync::oneshot;
35use tracing::{trace, warn};
36
37pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
39
40const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
42
43const MAX_BLOB_LIMIT: usize = 128;
45
46pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
59 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
60}
61
62struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
63 provider: Provider,
65 chain_spec: Arc<ChainSpec>,
67 beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
69 payload_store: PayloadStore<PayloadT>,
71 task_spawner: Box<dyn TaskSpawner>,
73 metrics: EngineApiMetrics,
75 client: ClientVersionV1,
77 capabilities: EngineCapabilities,
79 tx_pool: Pool,
81 validator: Validator,
83 latest_new_payload_response: Mutex<Option<Instant>>,
85 accept_execution_requests_hash: bool,
86}
87
88impl<Provider, PayloadT, Pool, Validator, ChainSpec>
89 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
90where
91 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
92 PayloadT: PayloadTypes,
93 Pool: TransactionPool + 'static,
94 Validator: EngineValidator<PayloadT>,
95 ChainSpec: EthereumHardforks + Send + Sync + 'static,
96{
97 #[expect(clippy::too_many_arguments)]
99 pub fn new(
100 provider: Provider,
101 chain_spec: Arc<ChainSpec>,
102 beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
103 payload_store: PayloadStore<PayloadT>,
104 tx_pool: Pool,
105 task_spawner: Box<dyn TaskSpawner>,
106 client: ClientVersionV1,
107 capabilities: EngineCapabilities,
108 validator: Validator,
109 accept_execution_requests_hash: bool,
110 ) -> Self {
111 let inner = Arc::new(EngineApiInner {
112 provider,
113 chain_spec,
114 beacon_consensus,
115 payload_store,
116 task_spawner,
117 metrics: EngineApiMetrics::default(),
118 client,
119 capabilities,
120 tx_pool,
121 validator,
122 latest_new_payload_response: Mutex::new(None),
123 accept_execution_requests_hash,
124 });
125 Self { inner }
126 }
127
128 pub fn get_client_version_v1(
130 &self,
131 _client: ClientVersionV1,
132 ) -> EngineApiResult<Vec<ClientVersionV1>> {
133 Ok(vec![self.inner.client.clone()])
134 }
135
136 async fn get_payload_attributes(
138 &self,
139 payload_id: PayloadId,
140 ) -> EngineApiResult<PayloadT::PayloadBuilderAttributes> {
141 Ok(self
142 .inner
143 .payload_store
144 .payload_attributes(payload_id)
145 .await
146 .ok_or(EngineApiError::UnknownPayload)??)
147 }
148
149 pub async fn new_payload_v1(
152 &self,
153 payload: PayloadT::ExecutionData,
154 ) -> EngineApiResult<PayloadStatus> {
155 let payload_or_attrs = PayloadOrAttributes::<
156 '_,
157 PayloadT::ExecutionData,
158 PayloadT::PayloadAttributes,
159 >::from_execution_payload(&payload);
160
161 self.inner
162 .validator
163 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
164
165 Ok(self
166 .inner
167 .beacon_consensus
168 .new_payload(payload)
169 .await
170 .inspect(|_| self.inner.on_new_payload_response())?)
171 }
172
173 async fn new_payload_v1_metered(
175 &self,
176 payload: PayloadT::ExecutionData,
177 ) -> EngineApiResult<PayloadStatus> {
178 let start = Instant::now();
179 let gas_used = payload.gas_used();
180
181 let res = Self::new_payload_v1(self, payload).await;
182 let elapsed = start.elapsed();
183 self.inner.metrics.latency.new_payload_v1.record(elapsed);
184 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
185 res
186 }
187
188 pub async fn new_payload_v2(
190 &self,
191 payload: PayloadT::ExecutionData,
192 ) -> EngineApiResult<PayloadStatus> {
193 let payload_or_attrs = PayloadOrAttributes::<
194 '_,
195 PayloadT::ExecutionData,
196 PayloadT::PayloadAttributes,
197 >::from_execution_payload(&payload);
198 self.inner
199 .validator
200 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
201 Ok(self
202 .inner
203 .beacon_consensus
204 .new_payload(payload)
205 .await
206 .inspect(|_| self.inner.on_new_payload_response())?)
207 }
208
209 pub async fn new_payload_v2_metered(
211 &self,
212 payload: PayloadT::ExecutionData,
213 ) -> EngineApiResult<PayloadStatus> {
214 let start = Instant::now();
215 let gas_used = payload.gas_used();
216
217 let res = Self::new_payload_v2(self, payload).await;
218 let elapsed = start.elapsed();
219 self.inner.metrics.latency.new_payload_v2.record(elapsed);
220 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
221 res
222 }
223
224 pub async fn new_payload_v3(
226 &self,
227 payload: PayloadT::ExecutionData,
228 ) -> EngineApiResult<PayloadStatus> {
229 let payload_or_attrs = PayloadOrAttributes::<
230 '_,
231 PayloadT::ExecutionData,
232 PayloadT::PayloadAttributes,
233 >::from_execution_payload(&payload);
234 self.inner
235 .validator
236 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
237
238 Ok(self
239 .inner
240 .beacon_consensus
241 .new_payload(payload)
242 .await
243 .inspect(|_| self.inner.on_new_payload_response())?)
244 }
245
246 pub async fn new_payload_v3_metered(
248 &self,
249 payload: PayloadT::ExecutionData,
250 ) -> RpcResult<PayloadStatus> {
251 let start = Instant::now();
252 let gas_used = payload.gas_used();
253
254 let res = Self::new_payload_v3(self, payload).await;
255 let elapsed = start.elapsed();
256 self.inner.metrics.latency.new_payload_v3.record(elapsed);
257 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
258 Ok(res?)
259 }
260
261 pub async fn new_payload_v4(
263 &self,
264 payload: PayloadT::ExecutionData,
265 ) -> EngineApiResult<PayloadStatus> {
266 let payload_or_attrs = PayloadOrAttributes::<
267 '_,
268 PayloadT::ExecutionData,
269 PayloadT::PayloadAttributes,
270 >::from_execution_payload(&payload);
271 self.inner
272 .validator
273 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
274
275 Ok(self
276 .inner
277 .beacon_consensus
278 .new_payload(payload)
279 .await
280 .inspect(|_| self.inner.on_new_payload_response())?)
281 }
282
283 pub async fn new_payload_v4_metered(
285 &self,
286 payload: PayloadT::ExecutionData,
287 ) -> RpcResult<PayloadStatus> {
288 let start = Instant::now();
289 let gas_used = payload.gas_used();
290
291 let res = Self::new_payload_v4(self, payload).await;
292
293 let elapsed = start.elapsed();
294 self.inner.metrics.latency.new_payload_v4.record(elapsed);
295 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
296 Ok(res?)
297 }
298}
299
300impl<Provider, EngineT, Pool, Validator, ChainSpec>
301 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
302where
303 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
304 EngineT: EngineTypes,
305 Pool: TransactionPool + 'static,
306 Validator: EngineValidator<EngineT>,
307 ChainSpec: EthereumHardforks + Send + Sync + 'static,
308{
309 pub async fn fork_choice_updated_v1(
316 &self,
317 state: ForkchoiceState,
318 payload_attrs: Option<EngineT::PayloadAttributes>,
319 ) -> EngineApiResult<ForkchoiceUpdated> {
320 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
321 .await
322 }
323
324 pub async fn fork_choice_updated_v1_metered(
326 &self,
327 state: ForkchoiceState,
328 payload_attrs: Option<EngineT::PayloadAttributes>,
329 ) -> EngineApiResult<ForkchoiceUpdated> {
330 let start = Instant::now();
331 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
332 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
333 self.inner.metrics.fcu_response.update_response_metrics(&res);
334 res
335 }
336
337 pub async fn fork_choice_updated_v2(
342 &self,
343 state: ForkchoiceState,
344 payload_attrs: Option<EngineT::PayloadAttributes>,
345 ) -> EngineApiResult<ForkchoiceUpdated> {
346 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
347 .await
348 }
349
350 pub async fn fork_choice_updated_v2_metered(
352 &self,
353 state: ForkchoiceState,
354 payload_attrs: Option<EngineT::PayloadAttributes>,
355 ) -> EngineApiResult<ForkchoiceUpdated> {
356 let start = Instant::now();
357 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
358 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
359 self.inner.metrics.fcu_response.update_response_metrics(&res);
360 res
361 }
362
363 pub async fn fork_choice_updated_v3(
368 &self,
369 state: ForkchoiceState,
370 payload_attrs: Option<EngineT::PayloadAttributes>,
371 ) -> EngineApiResult<ForkchoiceUpdated> {
372 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
373 .await
374 }
375
376 pub async fn fork_choice_updated_v3_metered(
378 &self,
379 state: ForkchoiceState,
380 payload_attrs: Option<EngineT::PayloadAttributes>,
381 ) -> EngineApiResult<ForkchoiceUpdated> {
382 let start = Instant::now();
383 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
384 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
385 self.inner.metrics.fcu_response.update_response_metrics(&res);
386 res
387 }
388
389 pub async fn get_payload_v1(
399 &self,
400 payload_id: PayloadId,
401 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
402 self.inner
403 .payload_store
404 .resolve(payload_id)
405 .await
406 .ok_or(EngineApiError::UnknownPayload)?
407 .map_err(|_| EngineApiError::UnknownPayload)?
408 .try_into()
409 .map_err(|_| {
410 warn!("could not transform built payload into ExecutionPayloadV1");
411 EngineApiError::UnknownPayload
412 })
413 }
414
415 pub async fn get_payload_v1_metered(
417 &self,
418 payload_id: PayloadId,
419 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
420 let start = Instant::now();
421 let res = Self::get_payload_v1(self, payload_id).await;
422 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
423 res
424 }
425
426 pub async fn get_payload_v2(
434 &self,
435 payload_id: PayloadId,
436 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
437 let attributes = self.get_payload_attributes(payload_id).await?;
439
440 validate_payload_timestamp(
442 &self.inner.chain_spec,
443 EngineApiMessageVersion::V2,
444 attributes.timestamp(),
445 )?;
446
447 self.inner
449 .payload_store
450 .resolve(payload_id)
451 .await
452 .ok_or(EngineApiError::UnknownPayload)?
453 .map_err(|_| EngineApiError::UnknownPayload)?
454 .try_into()
455 .map_err(|_| {
456 warn!("could not transform built payload into ExecutionPayloadV2");
457 EngineApiError::UnknownPayload
458 })
459 }
460
461 pub async fn get_payload_v2_metered(
463 &self,
464 payload_id: PayloadId,
465 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
466 let start = Instant::now();
467 let res = Self::get_payload_v2(self, payload_id).await;
468 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
469 res
470 }
471
472 pub async fn get_payload_v3(
480 &self,
481 payload_id: PayloadId,
482 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
483 let attributes = self.get_payload_attributes(payload_id).await?;
485
486 validate_payload_timestamp(
488 &self.inner.chain_spec,
489 EngineApiMessageVersion::V3,
490 attributes.timestamp(),
491 )?;
492
493 self.inner
495 .payload_store
496 .resolve(payload_id)
497 .await
498 .ok_or(EngineApiError::UnknownPayload)?
499 .map_err(|_| EngineApiError::UnknownPayload)?
500 .try_into()
501 .map_err(|_| {
502 warn!("could not transform built payload into ExecutionPayloadV3");
503 EngineApiError::UnknownPayload
504 })
505 }
506
507 pub async fn get_payload_v3_metered(
509 &self,
510 payload_id: PayloadId,
511 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
512 let start = Instant::now();
513 let res = Self::get_payload_v3(self, payload_id).await;
514 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
515 res
516 }
517
518 pub async fn get_payload_v4(
526 &self,
527 payload_id: PayloadId,
528 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
529 let attributes = self.get_payload_attributes(payload_id).await?;
531
532 validate_payload_timestamp(
534 &self.inner.chain_spec,
535 EngineApiMessageVersion::V4,
536 attributes.timestamp(),
537 )?;
538
539 self.inner
541 .payload_store
542 .resolve(payload_id)
543 .await
544 .ok_or(EngineApiError::UnknownPayload)?
545 .map_err(|_| EngineApiError::UnknownPayload)?
546 .try_into()
547 .map_err(|_| {
548 warn!("could not transform built payload into ExecutionPayloadV3");
549 EngineApiError::UnknownPayload
550 })
551 }
552
553 pub async fn get_payload_v4_metered(
555 &self,
556 payload_id: PayloadId,
557 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
558 let start = Instant::now();
559 let res = Self::get_payload_v4(self, payload_id).await;
560 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
561 res
562 }
563
564 pub async fn get_payload_bodies_by_range_with<F, R>(
567 &self,
568 start: BlockNumber,
569 count: u64,
570 f: F,
571 ) -> EngineApiResult<Vec<Option<R>>>
572 where
573 F: Fn(Provider::Block) -> R + Send + 'static,
574 R: Send + 'static,
575 {
576 let (tx, rx) = oneshot::channel();
577 let inner = self.inner.clone();
578
579 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
580 if count > MAX_PAYLOAD_BODIES_LIMIT {
581 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
582 return;
583 }
584
585 if start == 0 || count == 0 {
586 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
587 return;
588 }
589
590 let mut result = Vec::with_capacity(count as usize);
591
592 let mut end = start.saturating_add(count - 1);
594
595 if let Ok(best_block) = inner.provider.best_block_number() {
598 if end > best_block {
599 end = best_block;
600 }
601 }
602
603 for num in start..=end {
604 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
605 match block_result {
606 Ok(block) => {
607 result.push(block.map(&f));
608 }
609 Err(err) => {
610 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
611 return;
612 }
613 };
614 }
615 tx.send(Ok(result)).ok();
616 }));
617
618 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
619 }
620
621 pub async fn get_payload_bodies_by_range_v1(
632 &self,
633 start: BlockNumber,
634 count: u64,
635 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
636 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
637 transactions: block.body().encoded_2718_transactions(),
638 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
639 })
640 .await
641 }
642
643 pub async fn get_payload_bodies_by_range_v1_metered(
645 &self,
646 start: BlockNumber,
647 count: u64,
648 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
649 let start_time = Instant::now();
650 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
651 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
652 res
653 }
654
655 pub async fn get_payload_bodies_by_hash_with<F, R>(
657 &self,
658 hashes: Vec<BlockHash>,
659 f: F,
660 ) -> EngineApiResult<Vec<Option<R>>>
661 where
662 F: Fn(Provider::Block) -> R + Send + 'static,
663 R: Send + 'static,
664 {
665 let len = hashes.len() as u64;
666 if len > MAX_PAYLOAD_BODIES_LIMIT {
667 return Err(EngineApiError::PayloadRequestTooLarge { len });
668 }
669
670 let (tx, rx) = oneshot::channel();
671 let inner = self.inner.clone();
672
673 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
674 let mut result = Vec::with_capacity(hashes.len());
675 for hash in hashes {
676 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
677 match block_result {
678 Ok(block) => {
679 result.push(block.map(&f));
680 }
681 Err(err) => {
682 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
683 return;
684 }
685 }
686 }
687 tx.send(Ok(result)).ok();
688 }));
689
690 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
691 }
692
693 pub async fn get_payload_bodies_by_hash_v1(
695 &self,
696 hashes: Vec<BlockHash>,
697 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
698 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
699 transactions: block.body().encoded_2718_transactions(),
700 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
701 })
702 .await
703 }
704
705 pub async fn get_payload_bodies_by_hash_v1_metered(
707 &self,
708 hashes: Vec<BlockHash>,
709 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
710 let start = Instant::now();
711 let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
712 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
713 res.await
714 }
715
716 async fn validate_and_execute_forkchoice(
730 &self,
731 version: EngineApiMessageVersion,
732 state: ForkchoiceState,
733 payload_attrs: Option<EngineT::PayloadAttributes>,
734 ) -> EngineApiResult<ForkchoiceUpdated> {
735 self.inner.record_elapsed_time_on_fcu();
736
737 if let Some(ref attrs) = payload_attrs {
738 let attr_validation_res =
739 self.inner.validator.ensure_well_formed_attributes(version, attrs);
740
741 if let Err(err) = attr_validation_res {
755 let fcu_res =
756 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
757 if fcu_res.is_invalid() {
760 return Ok(fcu_res)
761 }
762 return Err(err.into())
763 }
764 }
765
766 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
767 }
768
769 pub fn capabilities(&self) -> &EngineCapabilities {
771 &self.inner.capabilities
772 }
773
774 fn get_blobs_v1(
775 &self,
776 versioned_hashes: Vec<B256>,
777 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
778 if versioned_hashes.len() > MAX_BLOB_LIMIT {
779 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
780 }
781
782 self.inner
783 .tx_pool
784 .get_blobs_for_versioned_hashes(&versioned_hashes)
785 .map_err(|err| EngineApiError::Internal(Box::new(err)))
786 }
787
788 fn get_blobs_v1_metered(
789 &self,
790 versioned_hashes: Vec<B256>,
791 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
792 let hashes_len = versioned_hashes.len();
793 let start = Instant::now();
794 let res = Self::get_blobs_v1(self, versioned_hashes);
795 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
796
797 if let Ok(blobs) = &res {
798 let blobs_found = blobs.iter().flatten().count();
799 let blobs_missed = hashes_len - blobs_found;
800
801 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
802 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
803 }
804
805 res
806 }
807}
808
809impl<Provider, PayloadT, Pool, Validator, ChainSpec>
810 EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
811where
812 PayloadT: PayloadTypes,
813{
814 fn record_elapsed_time_on_fcu(&self) {
817 if let Some(start_time) = self.latest_new_payload_response.lock().take() {
818 let elapsed_time = start_time.elapsed();
819 self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
820 }
821 }
822
823 fn on_new_payload_response(&self) {
825 self.latest_new_payload_response.lock().replace(Instant::now());
826 }
827}
828
829#[async_trait]
831impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
832 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
833where
834 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
835 EngineT: EngineTypes<ExecutionData = ExecutionData>,
836 Pool: TransactionPool + 'static,
837 Validator: EngineValidator<EngineT>,
838 ChainSpec: EthereumHardforks + Send + Sync + 'static,
839{
840 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
844 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
845 let payload =
846 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
847 Ok(self.new_payload_v1_metered(payload).await?)
848 }
849
850 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
853 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
854 let payload = ExecutionData {
855 payload: payload.into_payload(),
856 sidecar: ExecutionPayloadSidecar::none(),
857 };
858
859 Ok(self.new_payload_v2_metered(payload).await?)
860 }
861
862 async fn new_payload_v3(
865 &self,
866 payload: ExecutionPayloadV3,
867 versioned_hashes: Vec<B256>,
868 parent_beacon_block_root: B256,
869 ) -> RpcResult<PayloadStatus> {
870 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
871 let payload = ExecutionData {
872 payload: payload.into(),
873 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
874 versioned_hashes,
875 parent_beacon_block_root,
876 }),
877 };
878
879 Ok(self.new_payload_v3_metered(payload).await?)
880 }
881
882 async fn new_payload_v4(
885 &self,
886 payload: ExecutionPayloadV3,
887 versioned_hashes: Vec<B256>,
888 parent_beacon_block_root: B256,
889 requests: RequestsOrHash,
890 ) -> RpcResult<PayloadStatus> {
891 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
892
893 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
895 return Err(EngineApiError::UnexpectedRequestsHash.into());
896 }
897
898 let payload = ExecutionData {
899 payload: payload.into(),
900 sidecar: ExecutionPayloadSidecar::v4(
901 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
902 PraguePayloadFields { requests },
903 ),
904 };
905
906 Ok(self.new_payload_v4_metered(payload).await?)
907 }
908
909 async fn fork_choice_updated_v1(
914 &self,
915 fork_choice_state: ForkchoiceState,
916 payload_attributes: Option<EngineT::PayloadAttributes>,
917 ) -> RpcResult<ForkchoiceUpdated> {
918 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
919 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
920 }
921
922 async fn fork_choice_updated_v2(
925 &self,
926 fork_choice_state: ForkchoiceState,
927 payload_attributes: Option<EngineT::PayloadAttributes>,
928 ) -> RpcResult<ForkchoiceUpdated> {
929 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
930 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
931 }
932
933 async fn fork_choice_updated_v3(
937 &self,
938 fork_choice_state: ForkchoiceState,
939 payload_attributes: Option<EngineT::PayloadAttributes>,
940 ) -> RpcResult<ForkchoiceUpdated> {
941 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
942 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
943 }
944
945 async fn get_payload_v1(
957 &self,
958 payload_id: PayloadId,
959 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
960 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
961 Ok(self.get_payload_v1_metered(payload_id).await?)
962 }
963
964 async fn get_payload_v2(
974 &self,
975 payload_id: PayloadId,
976 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
977 trace!(target: "rpc::engine", "Serving engine_getPayloadV2");
978 Ok(self.get_payload_v2_metered(payload_id).await?)
979 }
980
981 async fn get_payload_v3(
991 &self,
992 payload_id: PayloadId,
993 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
994 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
995 Ok(self.get_payload_v3_metered(payload_id).await?)
996 }
997
998 async fn get_payload_v4(
1008 &self,
1009 payload_id: PayloadId,
1010 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1011 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1012 Ok(self.get_payload_v4_metered(payload_id).await?)
1013 }
1014
1015 async fn get_payload_bodies_by_hash_v1(
1018 &self,
1019 block_hashes: Vec<BlockHash>,
1020 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1021 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1022 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1023 }
1024
1025 async fn get_payload_bodies_by_range_v1(
1042 &self,
1043 start: U64,
1044 count: U64,
1045 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1046 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1047 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1048 }
1049
1050 async fn get_client_version_v1(
1054 &self,
1055 client: ClientVersionV1,
1056 ) -> RpcResult<Vec<ClientVersionV1>> {
1057 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1058 Ok(Self::get_client_version_v1(self, client)?)
1059 }
1060
1061 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1064 Ok(self.capabilities().list())
1065 }
1066
1067 async fn get_blobs_v1(
1068 &self,
1069 versioned_hashes: Vec<B256>,
1070 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1071 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1072 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1073 }
1074
1075 async fn get_blobs_v2(
1076 &self,
1077 _versioned_hashes: Vec<B256>,
1078 ) -> RpcResult<Vec<Option<BlobAndProofV2>>> {
1079 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1080 Err(internal_rpc_err("unimplemented"))
1081 }
1082}
1083
1084impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1085 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1086where
1087 EngineT: EngineTypes,
1088 Self: EngineApiServer<EngineT>,
1089{
1090 fn into_rpc_module(self) -> RpcModule<()> {
1091 self.into_rpc().remove_context()
1092 }
1093}
1094
1095impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1096 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1097where
1098 PayloadT: PayloadTypes,
1099{
1100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1101 f.debug_struct("EngineApi").finish_non_exhaustive()
1102 }
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107 use super::*;
1108 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1109 use assert_matches::assert_matches;
1110 use reth_chainspec::{ChainSpec, MAINNET};
1111 use reth_engine_primitives::BeaconEngineMessage;
1112 use reth_ethereum_engine_primitives::EthEngineTypes;
1113 use reth_ethereum_primitives::Block;
1114 use reth_node_ethereum::EthereumEngineValidator;
1115 use reth_payload_builder::test_utils::spawn_test_payload_service;
1116 use reth_provider::test_utils::MockEthProvider;
1117 use reth_tasks::TokioTaskExecutor;
1118 use reth_transaction_pool::noop::NoopTransactionPool;
1119 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1120
1121 fn setup_engine_api() -> (
1122 EngineApiTestHandle,
1123 EngineApi<
1124 Arc<MockEthProvider>,
1125 EthEngineTypes,
1126 NoopTransactionPool,
1127 EthereumEngineValidator,
1128 ChainSpec,
1129 >,
1130 ) {
1131 let client = ClientVersionV1 {
1132 code: ClientCode::RH,
1133 name: "Reth".to_string(),
1134 version: "v0.2.0-beta.5".to_string(),
1135 commit: "defa64b2".to_string(),
1136 };
1137
1138 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1139 let provider = Arc::new(MockEthProvider::default());
1140 let payload_store = spawn_test_payload_service();
1141 let (to_engine, engine_rx) = unbounded_channel();
1142 let task_executor = Box::<TokioTaskExecutor>::default();
1143 let api = EngineApi::new(
1144 provider.clone(),
1145 chain_spec.clone(),
1146 BeaconConsensusEngineHandle::new(to_engine),
1147 payload_store.into(),
1148 NoopTransactionPool::default(),
1149 task_executor,
1150 client,
1151 EngineCapabilities::default(),
1152 EthereumEngineValidator::new(chain_spec.clone()),
1153 false,
1154 );
1155 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1156 (handle, api)
1157 }
1158
1159 #[tokio::test]
1160 async fn engine_client_version_v1() {
1161 let client = ClientVersionV1 {
1162 code: ClientCode::RH,
1163 name: "Reth".to_string(),
1164 version: "v0.2.0-beta.5".to_string(),
1165 commit: "defa64b2".to_string(),
1166 };
1167 let (_, api) = setup_engine_api();
1168 let res = api.get_client_version_v1(client.clone());
1169 assert_eq!(res.unwrap(), vec![client]);
1170 }
1171
1172 struct EngineApiTestHandle {
1173 #[allow(dead_code)]
1174 chain_spec: Arc<ChainSpec>,
1175 provider: Arc<MockEthProvider>,
1176 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1177 }
1178
1179 #[tokio::test]
1180 async fn forwards_responses_to_consensus_engine() {
1181 let (mut handle, api) = setup_engine_api();
1182
1183 tokio::spawn(async move {
1184 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1185 let execution_data = ExecutionData {
1186 payload: payload_v1.into(),
1187 sidecar: ExecutionPayloadSidecar::none(),
1188 };
1189
1190 api.new_payload_v1(execution_data).await.unwrap();
1191 });
1192 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1193 }
1194
1195 mod get_payload_bodies {
1197 use super::*;
1198 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1199 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1200
1201 #[tokio::test]
1202 async fn invalid_params() {
1203 let (_, api) = setup_engine_api();
1204
1205 let by_range_tests = [
1206 (0, 0),
1208 (0, 1),
1209 (1, 0),
1210 ];
1211
1212 for (start, count) in by_range_tests {
1214 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1215 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1216 }
1217 }
1218
1219 #[tokio::test]
1220 async fn request_too_large() {
1221 let (_, api) = setup_engine_api();
1222
1223 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1224 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1225 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1226 }
1227
1228 #[tokio::test]
1229 async fn returns_payload_bodies() {
1230 let mut rng = generators::rng();
1231 let (handle, api) = setup_engine_api();
1232
1233 let (start, count) = (1, 10);
1234 let blocks = random_block_range(
1235 &mut rng,
1236 start..=start + count - 1,
1237 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1238 );
1239 handle
1240 .provider
1241 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1242
1243 let expected = blocks
1244 .iter()
1245 .cloned()
1246 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1247 .collect::<Vec<_>>();
1248
1249 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1250 assert_eq!(res, expected);
1251 }
1252
1253 #[tokio::test]
1254 async fn returns_payload_bodies_with_gaps() {
1255 let mut rng = generators::rng();
1256 let (handle, api) = setup_engine_api();
1257
1258 let (start, count) = (1, 100);
1259 let blocks = random_block_range(
1260 &mut rng,
1261 start..=start + count - 1,
1262 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1263 );
1264
1265 let first_missing_range = 26..=50;
1267 let second_missing_range = 76..=100;
1268 handle.provider.extend_blocks(
1269 blocks
1270 .iter()
1271 .filter(|b| {
1272 !first_missing_range.contains(&b.number) &&
1273 !second_missing_range.contains(&b.number)
1274 })
1275 .map(|b| (b.hash(), b.clone().into_block())),
1276 );
1277
1278 let expected = blocks
1279 .iter()
1280 .filter(|b| !second_missing_range.contains(&b.number))
1283 .cloned()
1284 .map(|b| {
1285 if first_missing_range.contains(&b.number) {
1286 None
1287 } else {
1288 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1289 }
1290 })
1291 .collect::<Vec<_>>();
1292
1293 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1294 assert_eq!(res, expected);
1295
1296 let expected = blocks
1297 .iter()
1298 .cloned()
1299 .map(|b| {
1302 if first_missing_range.contains(&b.number) ||
1303 second_missing_range.contains(&b.number)
1304 {
1305 None
1306 } else {
1307 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1308 }
1309 })
1310 .collect::<Vec<_>>();
1311
1312 let hashes = blocks.iter().map(|b| b.hash()).collect();
1313 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1314 assert_eq!(res, expected);
1315 }
1316 }
1317}