1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes, EngineValidator};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24 validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload,
25 PayloadBuilderAttributes, PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{sync::Arc, time::Instant};
33use tokio::sync::oneshot;
34use tracing::{debug, trace, warn};
35
36pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
38
39const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
41
42const MAX_BLOB_LIMIT: usize = 128;
44
45pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
61 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
62}
63
64impl<Provider, PayloadT, Pool, Validator, ChainSpec>
65 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
66where
67 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
68 PayloadT: PayloadTypes,
69 Pool: TransactionPool + 'static,
70 Validator: EngineValidator<PayloadT>,
71 ChainSpec: EthereumHardforks + Send + Sync + 'static,
72{
73 #[expect(clippy::too_many_arguments)]
75 pub fn new(
76 provider: Provider,
77 chain_spec: Arc<ChainSpec>,
78 beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
79 payload_store: PayloadStore<PayloadT>,
80 tx_pool: Pool,
81 task_spawner: Box<dyn TaskSpawner>,
82 client: ClientVersionV1,
83 capabilities: EngineCapabilities,
84 validator: Validator,
85 accept_execution_requests_hash: bool,
86 ) -> Self {
87 let inner = Arc::new(EngineApiInner {
88 provider,
89 chain_spec,
90 beacon_consensus,
91 payload_store,
92 task_spawner,
93 metrics: EngineApiMetrics::default(),
94 client,
95 capabilities,
96 tx_pool,
97 validator,
98 latest_new_payload_response: Mutex::new(None),
99 accept_execution_requests_hash,
100 });
101 Self { inner }
102 }
103
104 pub fn get_client_version_v1(
106 &self,
107 _client: ClientVersionV1,
108 ) -> EngineApiResult<Vec<ClientVersionV1>> {
109 Ok(vec![self.inner.client.clone()])
110 }
111
112 async fn get_payload_attributes(
114 &self,
115 payload_id: PayloadId,
116 ) -> EngineApiResult<PayloadT::PayloadBuilderAttributes> {
117 Ok(self
118 .inner
119 .payload_store
120 .payload_attributes(payload_id)
121 .await
122 .ok_or(EngineApiError::UnknownPayload)??)
123 }
124
125 pub async fn new_payload_v1(
128 &self,
129 payload: PayloadT::ExecutionData,
130 ) -> EngineApiResult<PayloadStatus> {
131 let payload_or_attrs = PayloadOrAttributes::<
132 '_,
133 PayloadT::ExecutionData,
134 PayloadT::PayloadAttributes,
135 >::from_execution_payload(&payload);
136
137 self.inner
138 .validator
139 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
140
141 Ok(self
142 .inner
143 .beacon_consensus
144 .new_payload(payload)
145 .await
146 .inspect(|_| self.inner.on_new_payload_response())?)
147 }
148
149 pub async fn new_payload_v1_metered(
151 &self,
152 payload: PayloadT::ExecutionData,
153 ) -> EngineApiResult<PayloadStatus> {
154 let start = Instant::now();
155 let gas_used = payload.gas_used();
156
157 let res = Self::new_payload_v1(self, payload).await;
158 let elapsed = start.elapsed();
159 self.inner.metrics.latency.new_payload_v1.record(elapsed);
160 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
161 res
162 }
163
164 pub async fn new_payload_v2(
166 &self,
167 payload: PayloadT::ExecutionData,
168 ) -> EngineApiResult<PayloadStatus> {
169 let payload_or_attrs = PayloadOrAttributes::<
170 '_,
171 PayloadT::ExecutionData,
172 PayloadT::PayloadAttributes,
173 >::from_execution_payload(&payload);
174 self.inner
175 .validator
176 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
177 Ok(self
178 .inner
179 .beacon_consensus
180 .new_payload(payload)
181 .await
182 .inspect(|_| self.inner.on_new_payload_response())?)
183 }
184
185 pub async fn new_payload_v2_metered(
187 &self,
188 payload: PayloadT::ExecutionData,
189 ) -> EngineApiResult<PayloadStatus> {
190 let start = Instant::now();
191 let gas_used = payload.gas_used();
192
193 let res = Self::new_payload_v2(self, payload).await;
194 let elapsed = start.elapsed();
195 self.inner.metrics.latency.new_payload_v2.record(elapsed);
196 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
197 res
198 }
199
200 pub async fn new_payload_v3(
202 &self,
203 payload: PayloadT::ExecutionData,
204 ) -> EngineApiResult<PayloadStatus> {
205 let payload_or_attrs = PayloadOrAttributes::<
206 '_,
207 PayloadT::ExecutionData,
208 PayloadT::PayloadAttributes,
209 >::from_execution_payload(&payload);
210 self.inner
211 .validator
212 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
213
214 Ok(self
215 .inner
216 .beacon_consensus
217 .new_payload(payload)
218 .await
219 .inspect(|_| self.inner.on_new_payload_response())?)
220 }
221
222 pub async fn new_payload_v3_metered(
224 &self,
225 payload: PayloadT::ExecutionData,
226 ) -> RpcResult<PayloadStatus> {
227 let start = Instant::now();
228 let gas_used = payload.gas_used();
229
230 let res = Self::new_payload_v3(self, payload).await;
231 let elapsed = start.elapsed();
232 self.inner.metrics.latency.new_payload_v3.record(elapsed);
233 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
234 Ok(res?)
235 }
236
237 pub async fn new_payload_v4(
239 &self,
240 payload: PayloadT::ExecutionData,
241 ) -> EngineApiResult<PayloadStatus> {
242 let payload_or_attrs = PayloadOrAttributes::<
243 '_,
244 PayloadT::ExecutionData,
245 PayloadT::PayloadAttributes,
246 >::from_execution_payload(&payload);
247 self.inner
248 .validator
249 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
250
251 Ok(self
252 .inner
253 .beacon_consensus
254 .new_payload(payload)
255 .await
256 .inspect(|_| self.inner.on_new_payload_response())?)
257 }
258
259 pub async fn new_payload_v4_metered(
261 &self,
262 payload: PayloadT::ExecutionData,
263 ) -> RpcResult<PayloadStatus> {
264 let start = Instant::now();
265 let gas_used = payload.gas_used();
266
267 let res = Self::new_payload_v4(self, payload).await;
268
269 let elapsed = start.elapsed();
270 self.inner.metrics.latency.new_payload_v4.record(elapsed);
271 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
272 Ok(res?)
273 }
274
275 pub fn accept_execution_requests_hash(&self) -> bool {
277 self.inner.accept_execution_requests_hash
278 }
279}
280
281impl<Provider, EngineT, Pool, Validator, ChainSpec>
282 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
283where
284 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
285 EngineT: EngineTypes,
286 Pool: TransactionPool + 'static,
287 Validator: EngineValidator<EngineT>,
288 ChainSpec: EthereumHardforks + Send + Sync + 'static,
289{
290 pub async fn fork_choice_updated_v1(
297 &self,
298 state: ForkchoiceState,
299 payload_attrs: Option<EngineT::PayloadAttributes>,
300 ) -> EngineApiResult<ForkchoiceUpdated> {
301 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
302 .await
303 }
304
305 pub async fn fork_choice_updated_v1_metered(
307 &self,
308 state: ForkchoiceState,
309 payload_attrs: Option<EngineT::PayloadAttributes>,
310 ) -> EngineApiResult<ForkchoiceUpdated> {
311 let start = Instant::now();
312 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
313 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
314 self.inner.metrics.fcu_response.update_response_metrics(&res);
315 res
316 }
317
318 pub async fn fork_choice_updated_v2(
323 &self,
324 state: ForkchoiceState,
325 payload_attrs: Option<EngineT::PayloadAttributes>,
326 ) -> EngineApiResult<ForkchoiceUpdated> {
327 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
328 .await
329 }
330
331 pub async fn fork_choice_updated_v2_metered(
333 &self,
334 state: ForkchoiceState,
335 payload_attrs: Option<EngineT::PayloadAttributes>,
336 ) -> EngineApiResult<ForkchoiceUpdated> {
337 let start = Instant::now();
338 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
339 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
340 self.inner.metrics.fcu_response.update_response_metrics(&res);
341 res
342 }
343
344 pub async fn fork_choice_updated_v3(
349 &self,
350 state: ForkchoiceState,
351 payload_attrs: Option<EngineT::PayloadAttributes>,
352 ) -> EngineApiResult<ForkchoiceUpdated> {
353 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
354 .await
355 }
356
357 pub async fn fork_choice_updated_v3_metered(
359 &self,
360 state: ForkchoiceState,
361 payload_attrs: Option<EngineT::PayloadAttributes>,
362 ) -> EngineApiResult<ForkchoiceUpdated> {
363 let start = Instant::now();
364 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
365 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
366 self.inner.metrics.fcu_response.update_response_metrics(&res);
367 res
368 }
369
370 async fn get_built_payload(
372 &self,
373 payload_id: PayloadId,
374 ) -> EngineApiResult<EngineT::BuiltPayload> {
375 self.inner
376 .payload_store
377 .resolve(payload_id)
378 .await
379 .ok_or(EngineApiError::UnknownPayload)?
380 .map_err(|_| EngineApiError::UnknownPayload)
381 }
382
383 async fn get_payload_inner<R>(
386 &self,
387 payload_id: PayloadId,
388 version: EngineApiMessageVersion,
389 ) -> EngineApiResult<R>
390 where
391 EngineT::BuiltPayload: TryInto<R>,
392 {
393 let attributes = self.get_payload_attributes(payload_id).await?;
395
396 validate_payload_timestamp(&self.inner.chain_spec, version, attributes.timestamp())?;
398
399 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
401 warn!(?version, "could not transform built payload");
402 EngineApiError::UnknownPayload
403 })
404 }
405
406 pub async fn get_payload_v1(
416 &self,
417 payload_id: PayloadId,
418 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
419 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
420 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
421 EngineApiError::UnknownPayload
422 })
423 }
424
425 pub async fn get_payload_v1_metered(
427 &self,
428 payload_id: PayloadId,
429 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
430 let start = Instant::now();
431 let res = Self::get_payload_v1(self, payload_id).await;
432 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
433 res
434 }
435
436 pub async fn get_payload_v2(
444 &self,
445 payload_id: PayloadId,
446 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
447 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
448 }
449
450 pub async fn get_payload_v2_metered(
452 &self,
453 payload_id: PayloadId,
454 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
455 let start = Instant::now();
456 let res = Self::get_payload_v2(self, payload_id).await;
457 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
458 res
459 }
460
461 pub async fn get_payload_v3(
469 &self,
470 payload_id: PayloadId,
471 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
472 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
473 }
474
475 pub async fn get_payload_v3_metered(
477 &self,
478 payload_id: PayloadId,
479 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
480 let start = Instant::now();
481 let res = Self::get_payload_v3(self, payload_id).await;
482 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
483 res
484 }
485
486 pub async fn get_payload_v4(
494 &self,
495 payload_id: PayloadId,
496 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
497 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
498 }
499
500 pub async fn get_payload_v4_metered(
502 &self,
503 payload_id: PayloadId,
504 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
505 let start = Instant::now();
506 let res = Self::get_payload_v4(self, payload_id).await;
507 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
508 res
509 }
510
511 pub async fn get_payload_v5(
521 &self,
522 payload_id: PayloadId,
523 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
524 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
525 }
526
527 pub async fn get_payload_v5_metered(
529 &self,
530 payload_id: PayloadId,
531 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
532 let start = Instant::now();
533 let res = Self::get_payload_v5(self, payload_id).await;
534 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
535 res
536 }
537
538 pub async fn get_payload_bodies_by_range_with<F, R>(
541 &self,
542 start: BlockNumber,
543 count: u64,
544 f: F,
545 ) -> EngineApiResult<Vec<Option<R>>>
546 where
547 F: Fn(Provider::Block) -> R + Send + 'static,
548 R: Send + 'static,
549 {
550 let (tx, rx) = oneshot::channel();
551 let inner = self.inner.clone();
552
553 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
554 if count > MAX_PAYLOAD_BODIES_LIMIT {
555 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
556 return;
557 }
558
559 if start == 0 || count == 0 {
560 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
561 return;
562 }
563
564 let mut result = Vec::with_capacity(count as usize);
565
566 let mut end = start.saturating_add(count - 1);
568
569 if let Ok(best_block) = inner.provider.best_block_number() {
572 if end > best_block {
573 end = best_block;
574 }
575 }
576
577 for num in start..=end {
578 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
579 match block_result {
580 Ok(block) => {
581 result.push(block.map(&f));
582 }
583 Err(err) => {
584 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
585 return;
586 }
587 };
588 }
589 tx.send(Ok(result)).ok();
590 }));
591
592 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
593 }
594
595 pub async fn get_payload_bodies_by_range_v1(
606 &self,
607 start: BlockNumber,
608 count: u64,
609 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
610 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
611 transactions: block.body().encoded_2718_transactions(),
612 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
613 })
614 .await
615 }
616
617 pub async fn get_payload_bodies_by_range_v1_metered(
619 &self,
620 start: BlockNumber,
621 count: u64,
622 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
623 let start_time = Instant::now();
624 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
625 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
626 res
627 }
628
629 pub async fn get_payload_bodies_by_hash_with<F, R>(
631 &self,
632 hashes: Vec<BlockHash>,
633 f: F,
634 ) -> EngineApiResult<Vec<Option<R>>>
635 where
636 F: Fn(Provider::Block) -> R + Send + 'static,
637 R: Send + 'static,
638 {
639 let len = hashes.len() as u64;
640 if len > MAX_PAYLOAD_BODIES_LIMIT {
641 return Err(EngineApiError::PayloadRequestTooLarge { len });
642 }
643
644 let (tx, rx) = oneshot::channel();
645 let inner = self.inner.clone();
646
647 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
648 let mut result = Vec::with_capacity(hashes.len());
649 for hash in hashes {
650 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
651 match block_result {
652 Ok(block) => {
653 result.push(block.map(&f));
654 }
655 Err(err) => {
656 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
657 return;
658 }
659 }
660 }
661 tx.send(Ok(result)).ok();
662 }));
663
664 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
665 }
666
667 pub async fn get_payload_bodies_by_hash_v1(
669 &self,
670 hashes: Vec<BlockHash>,
671 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
672 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
673 transactions: block.body().encoded_2718_transactions(),
674 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
675 })
676 .await
677 }
678
679 pub async fn get_payload_bodies_by_hash_v1_metered(
681 &self,
682 hashes: Vec<BlockHash>,
683 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
684 let start = Instant::now();
685 let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
686 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
687 res.await
688 }
689
690 async fn validate_and_execute_forkchoice(
704 &self,
705 version: EngineApiMessageVersion,
706 state: ForkchoiceState,
707 payload_attrs: Option<EngineT::PayloadAttributes>,
708 ) -> EngineApiResult<ForkchoiceUpdated> {
709 self.inner.record_elapsed_time_on_fcu();
710
711 if let Some(ref attrs) = payload_attrs {
712 let attr_validation_res =
713 self.inner.validator.ensure_well_formed_attributes(version, attrs);
714
715 if let Err(err) = attr_validation_res {
729 let fcu_res =
730 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
731 if fcu_res.is_invalid() {
734 return Ok(fcu_res)
735 }
736 return Err(err.into())
737 }
738 }
739
740 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
741 }
742
743 pub fn capabilities(&self) -> &EngineCapabilities {
745 &self.inner.capabilities
746 }
747
748 fn get_blobs_v1(
749 &self,
750 versioned_hashes: Vec<B256>,
751 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
752 if versioned_hashes.len() > MAX_BLOB_LIMIT {
753 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
754 }
755
756 self.inner
757 .tx_pool
758 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
759 .map_err(|err| EngineApiError::Internal(Box::new(err)))
760 }
761
762 pub fn get_blobs_v1_metered(
764 &self,
765 versioned_hashes: Vec<B256>,
766 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
767 let hashes_len = versioned_hashes.len();
768 let start = Instant::now();
769 let res = Self::get_blobs_v1(self, versioned_hashes);
770 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
771
772 if let Ok(blobs) = &res {
773 let blobs_found = blobs.iter().flatten().count();
774 let blobs_missed = hashes_len - blobs_found;
775
776 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
777 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
778 }
779
780 res
781 }
782
783 fn get_blobs_v2(
784 &self,
785 versioned_hashes: Vec<B256>,
786 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
787 if versioned_hashes.len() > MAX_BLOB_LIMIT {
788 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
789 }
790
791 self.inner
792 .tx_pool
793 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
794 .map_err(|err| EngineApiError::Internal(Box::new(err)))
795 }
796
797 pub fn get_blobs_v2_metered(
799 &self,
800 versioned_hashes: Vec<B256>,
801 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
802 let hashes_len = versioned_hashes.len();
803 let start = Instant::now();
804 let res = Self::get_blobs_v2(self, versioned_hashes);
805 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
806
807 if let Ok(blobs) = &res {
808 let blobs_found = blobs.iter().flatten().count();
809
810 self.inner
811 .metrics
812 .blob_metrics
813 .get_blobs_requests_blobs_total
814 .increment(hashes_len as u64);
815 self.inner
816 .metrics
817 .blob_metrics
818 .get_blobs_requests_blobs_in_blobpool_total
819 .increment(blobs_found as u64);
820
821 if blobs_found == hashes_len {
822 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
823 } else {
824 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
825 }
826 } else {
827 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
828 }
829
830 res
831 }
832}
833
834#[async_trait]
836impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
837 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
838where
839 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
840 EngineT: EngineTypes<ExecutionData = ExecutionData>,
841 Pool: TransactionPool + 'static,
842 Validator: EngineValidator<EngineT>,
843 ChainSpec: EthereumHardforks + Send + Sync + 'static,
844{
845 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
849 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
850 let payload =
851 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
852 Ok(self.new_payload_v1_metered(payload).await?)
853 }
854
855 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
858 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
859 let payload = ExecutionData {
860 payload: payload.into_payload(),
861 sidecar: ExecutionPayloadSidecar::none(),
862 };
863
864 Ok(self.new_payload_v2_metered(payload).await?)
865 }
866
867 async fn new_payload_v3(
870 &self,
871 payload: ExecutionPayloadV3,
872 versioned_hashes: Vec<B256>,
873 parent_beacon_block_root: B256,
874 ) -> RpcResult<PayloadStatus> {
875 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
876 let payload = ExecutionData {
877 payload: payload.into(),
878 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
879 versioned_hashes,
880 parent_beacon_block_root,
881 }),
882 };
883
884 Ok(self.new_payload_v3_metered(payload).await?)
885 }
886
887 async fn new_payload_v4(
890 &self,
891 payload: ExecutionPayloadV3,
892 versioned_hashes: Vec<B256>,
893 parent_beacon_block_root: B256,
894 requests: RequestsOrHash,
895 ) -> RpcResult<PayloadStatus> {
896 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
897
898 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
900 return Err(EngineApiError::UnexpectedRequestsHash.into());
901 }
902
903 let payload = ExecutionData {
904 payload: payload.into(),
905 sidecar: ExecutionPayloadSidecar::v4(
906 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
907 PraguePayloadFields { requests },
908 ),
909 };
910
911 Ok(self.new_payload_v4_metered(payload).await?)
912 }
913
914 async fn fork_choice_updated_v1(
919 &self,
920 fork_choice_state: ForkchoiceState,
921 payload_attributes: Option<EngineT::PayloadAttributes>,
922 ) -> RpcResult<ForkchoiceUpdated> {
923 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
924 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
925 }
926
927 async fn fork_choice_updated_v2(
930 &self,
931 fork_choice_state: ForkchoiceState,
932 payload_attributes: Option<EngineT::PayloadAttributes>,
933 ) -> RpcResult<ForkchoiceUpdated> {
934 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
935 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
936 }
937
938 async fn fork_choice_updated_v3(
942 &self,
943 fork_choice_state: ForkchoiceState,
944 payload_attributes: Option<EngineT::PayloadAttributes>,
945 ) -> RpcResult<ForkchoiceUpdated> {
946 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
947 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
948 }
949
950 async fn get_payload_v1(
962 &self,
963 payload_id: PayloadId,
964 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
965 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
966 Ok(self.get_payload_v1_metered(payload_id).await?)
967 }
968
969 async fn get_payload_v2(
979 &self,
980 payload_id: PayloadId,
981 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
982 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
983 Ok(self.get_payload_v2_metered(payload_id).await?)
984 }
985
986 async fn get_payload_v3(
996 &self,
997 payload_id: PayloadId,
998 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
999 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1000 Ok(self.get_payload_v3_metered(payload_id).await?)
1001 }
1002
1003 async fn get_payload_v4(
1013 &self,
1014 payload_id: PayloadId,
1015 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1016 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1017 Ok(self.get_payload_v4_metered(payload_id).await?)
1018 }
1019
1020 async fn get_payload_v5(
1030 &self,
1031 payload_id: PayloadId,
1032 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1033 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1034 Ok(self.get_payload_v5_metered(payload_id).await?)
1035 }
1036
1037 async fn get_payload_bodies_by_hash_v1(
1040 &self,
1041 block_hashes: Vec<BlockHash>,
1042 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1043 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1044 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1045 }
1046
1047 async fn get_payload_bodies_by_range_v1(
1064 &self,
1065 start: U64,
1066 count: U64,
1067 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1068 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1069 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1070 }
1071
1072 async fn get_client_version_v1(
1076 &self,
1077 client: ClientVersionV1,
1078 ) -> RpcResult<Vec<ClientVersionV1>> {
1079 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1080 Ok(Self::get_client_version_v1(self, client)?)
1081 }
1082
1083 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1086 Ok(self.capabilities().list())
1087 }
1088
1089 async fn get_blobs_v1(
1090 &self,
1091 versioned_hashes: Vec<B256>,
1092 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1093 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1094 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1095 }
1096
1097 async fn get_blobs_v2(
1098 &self,
1099 versioned_hashes: Vec<B256>,
1100 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1101 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1102 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1103 }
1104}
1105
1106impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1107 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1108where
1109 EngineT: EngineTypes,
1110 Self: EngineApiServer<EngineT>,
1111{
1112 fn into_rpc_module(self) -> RpcModule<()> {
1113 self.into_rpc().remove_context()
1114 }
1115}
1116
1117impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1118 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1119where
1120 PayloadT: PayloadTypes,
1121{
1122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1123 f.debug_struct("EngineApi").finish_non_exhaustive()
1124 }
1125}
1126
1127impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1128 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1129where
1130 PayloadT: PayloadTypes,
1131{
1132 fn clone(&self) -> Self {
1133 Self { inner: Arc::clone(&self.inner) }
1134 }
1135}
1136
1137struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1139 provider: Provider,
1141 chain_spec: Arc<ChainSpec>,
1143 beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
1145 payload_store: PayloadStore<PayloadT>,
1147 task_spawner: Box<dyn TaskSpawner>,
1149 metrics: EngineApiMetrics,
1151 client: ClientVersionV1,
1153 capabilities: EngineCapabilities,
1155 tx_pool: Pool,
1157 validator: Validator,
1159 latest_new_payload_response: Mutex<Option<Instant>>,
1161 accept_execution_requests_hash: bool,
1162}
1163
1164impl<Provider, PayloadT, Pool, Validator, ChainSpec>
1165 EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
1166where
1167 PayloadT: PayloadTypes,
1168{
1169 fn record_elapsed_time_on_fcu(&self) {
1172 if let Some(start_time) = self.latest_new_payload_response.lock().take() {
1173 let elapsed_time = start_time.elapsed();
1174 self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
1175 }
1176 }
1177
1178 fn on_new_payload_response(&self) {
1180 self.latest_new_payload_response.lock().replace(Instant::now());
1181 }
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186 use super::*;
1187 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1188 use assert_matches::assert_matches;
1189 use reth_chainspec::{ChainSpec, MAINNET};
1190 use reth_engine_primitives::BeaconEngineMessage;
1191 use reth_ethereum_engine_primitives::EthEngineTypes;
1192 use reth_ethereum_primitives::Block;
1193 use reth_node_ethereum::EthereumEngineValidator;
1194 use reth_payload_builder::test_utils::spawn_test_payload_service;
1195 use reth_provider::test_utils::MockEthProvider;
1196 use reth_tasks::TokioTaskExecutor;
1197 use reth_transaction_pool::noop::NoopTransactionPool;
1198 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1199
1200 fn setup_engine_api() -> (
1201 EngineApiTestHandle,
1202 EngineApi<
1203 Arc<MockEthProvider>,
1204 EthEngineTypes,
1205 NoopTransactionPool,
1206 EthereumEngineValidator,
1207 ChainSpec,
1208 >,
1209 ) {
1210 let client = ClientVersionV1 {
1211 code: ClientCode::RH,
1212 name: "Reth".to_string(),
1213 version: "v0.2.0-beta.5".to_string(),
1214 commit: "defa64b2".to_string(),
1215 };
1216
1217 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1218 let provider = Arc::new(MockEthProvider::default());
1219 let payload_store = spawn_test_payload_service();
1220 let (to_engine, engine_rx) = unbounded_channel();
1221 let task_executor = Box::<TokioTaskExecutor>::default();
1222 let api = EngineApi::new(
1223 provider.clone(),
1224 chain_spec.clone(),
1225 BeaconConsensusEngineHandle::new(to_engine),
1226 payload_store.into(),
1227 NoopTransactionPool::default(),
1228 task_executor,
1229 client,
1230 EngineCapabilities::default(),
1231 EthereumEngineValidator::new(chain_spec.clone()),
1232 false,
1233 );
1234 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1235 (handle, api)
1236 }
1237
1238 #[tokio::test]
1239 async fn engine_client_version_v1() {
1240 let client = ClientVersionV1 {
1241 code: ClientCode::RH,
1242 name: "Reth".to_string(),
1243 version: "v0.2.0-beta.5".to_string(),
1244 commit: "defa64b2".to_string(),
1245 };
1246 let (_, api) = setup_engine_api();
1247 let res = api.get_client_version_v1(client.clone());
1248 assert_eq!(res.unwrap(), vec![client]);
1249 }
1250
1251 struct EngineApiTestHandle {
1252 #[allow(dead_code)]
1253 chain_spec: Arc<ChainSpec>,
1254 provider: Arc<MockEthProvider>,
1255 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1256 }
1257
1258 #[tokio::test]
1259 async fn forwards_responses_to_consensus_engine() {
1260 let (mut handle, api) = setup_engine_api();
1261
1262 tokio::spawn(async move {
1263 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1264 let execution_data = ExecutionData {
1265 payload: payload_v1.into(),
1266 sidecar: ExecutionPayloadSidecar::none(),
1267 };
1268
1269 api.new_payload_v1(execution_data).await.unwrap();
1270 });
1271 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1272 }
1273
1274 mod get_payload_bodies {
1276 use super::*;
1277 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1278 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1279
1280 #[tokio::test]
1281 async fn invalid_params() {
1282 let (_, api) = setup_engine_api();
1283
1284 let by_range_tests = [
1285 (0, 0),
1287 (0, 1),
1288 (1, 0),
1289 ];
1290
1291 for (start, count) in by_range_tests {
1293 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1294 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1295 }
1296 }
1297
1298 #[tokio::test]
1299 async fn request_too_large() {
1300 let (_, api) = setup_engine_api();
1301
1302 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1303 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1304 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1305 }
1306
1307 #[tokio::test]
1308 async fn returns_payload_bodies() {
1309 let mut rng = generators::rng();
1310 let (handle, api) = setup_engine_api();
1311
1312 let (start, count) = (1, 10);
1313 let blocks = random_block_range(
1314 &mut rng,
1315 start..=start + count - 1,
1316 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1317 );
1318 handle
1319 .provider
1320 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1321
1322 let expected = blocks
1323 .iter()
1324 .cloned()
1325 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1326 .collect::<Vec<_>>();
1327
1328 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1329 assert_eq!(res, expected);
1330 }
1331
1332 #[tokio::test]
1333 async fn returns_payload_bodies_with_gaps() {
1334 let mut rng = generators::rng();
1335 let (handle, api) = setup_engine_api();
1336
1337 let (start, count) = (1, 100);
1338 let blocks = random_block_range(
1339 &mut rng,
1340 start..=start + count - 1,
1341 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1342 );
1343
1344 let first_missing_range = 26..=50;
1346 let second_missing_range = 76..=100;
1347 handle.provider.extend_blocks(
1348 blocks
1349 .iter()
1350 .filter(|b| {
1351 !first_missing_range.contains(&b.number) &&
1352 !second_missing_range.contains(&b.number)
1353 })
1354 .map(|b| (b.hash(), b.clone().into_block())),
1355 );
1356
1357 let expected = blocks
1358 .iter()
1359 .filter(|b| !second_missing_range.contains(&b.number))
1362 .cloned()
1363 .map(|b| {
1364 if first_missing_range.contains(&b.number) {
1365 None
1366 } else {
1367 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1368 }
1369 })
1370 .collect::<Vec<_>>();
1371
1372 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1373 assert_eq!(res, expected);
1374
1375 let expected = blocks
1376 .iter()
1377 .cloned()
1378 .map(|b| {
1381 if first_missing_range.contains(&b.number) ||
1382 second_missing_range.contains(&b.number)
1383 {
1384 None
1385 } else {
1386 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1387 }
1388 })
1389 .collect::<Vec<_>>();
1390
1391 let hashes = blocks.iter().map(|b| b.hash()).collect();
1392 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1393 assert_eq!(res, expected);
1394 }
1395 }
1396}