1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use reth_chainspec::EthereumHardforks;
20use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
21use reth_payload_builder::PayloadStore;
22use reth_payload_primitives::{
23 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
24 PayloadOrAttributes, PayloadTypes,
25};
26use reth_primitives_traits::{Block, BlockBody};
27use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
28use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
29use reth_tasks::TaskSpawner;
30use reth_transaction_pool::TransactionPool;
31use std::{
32 sync::Arc,
33 time::{Instant, SystemTime},
34};
35use tokio::sync::oneshot;
36use tracing::{debug, trace, warn};
37
38pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
40
41const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
43
44const MAX_BLOB_LIMIT: usize = 128;
46
47pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
63 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
64}
65
66impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
67 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
68{
69 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
71 &self.inner.chain_spec
72 }
73}
74
75impl<Provider, PayloadT, Pool, Validator, ChainSpec>
76 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
77where
78 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
79 PayloadT: PayloadTypes,
80 Pool: TransactionPool + 'static,
81 Validator: EngineApiValidator<PayloadT>,
82 ChainSpec: EthereumHardforks + Send + Sync + 'static,
83{
84 #[expect(clippy::too_many_arguments)]
86 pub fn new(
87 provider: Provider,
88 chain_spec: Arc<ChainSpec>,
89 beacon_consensus: ConsensusEngineHandle<PayloadT>,
90 payload_store: PayloadStore<PayloadT>,
91 tx_pool: Pool,
92 task_spawner: Box<dyn TaskSpawner>,
93 client: ClientVersionV1,
94 capabilities: EngineCapabilities,
95 validator: Validator,
96 accept_execution_requests_hash: bool,
97 ) -> Self {
98 let inner = Arc::new(EngineApiInner {
99 provider,
100 chain_spec,
101 beacon_consensus,
102 payload_store,
103 task_spawner,
104 metrics: EngineApiMetrics::default(),
105 client,
106 capabilities,
107 tx_pool,
108 validator,
109 accept_execution_requests_hash,
110 });
111 Self { inner }
112 }
113
114 pub fn get_client_version_v1(
116 &self,
117 _client: ClientVersionV1,
118 ) -> EngineApiResult<Vec<ClientVersionV1>> {
119 Ok(vec![self.inner.client.clone()])
120 }
121
122 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
124 Ok(self
125 .inner
126 .payload_store
127 .payload_timestamp(payload_id)
128 .await
129 .ok_or(EngineApiError::UnknownPayload)??)
130 }
131
132 pub async fn new_payload_v1(
135 &self,
136 payload: PayloadT::ExecutionData,
137 ) -> EngineApiResult<PayloadStatus> {
138 let payload_or_attrs = PayloadOrAttributes::<
139 '_,
140 PayloadT::ExecutionData,
141 PayloadT::PayloadAttributes,
142 >::from_execution_payload(&payload);
143
144 self.inner
145 .validator
146 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
147
148 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
149 }
150
151 pub async fn new_payload_v1_metered(
153 &self,
154 payload: PayloadT::ExecutionData,
155 ) -> EngineApiResult<PayloadStatus> {
156 let start = Instant::now();
157 let res = Self::new_payload_v1(self, payload).await;
158 let elapsed = start.elapsed();
159 self.inner.metrics.latency.new_payload_v1.record(elapsed);
160 res
161 }
162
163 pub async fn new_payload_v2(
165 &self,
166 payload: PayloadT::ExecutionData,
167 ) -> EngineApiResult<PayloadStatus> {
168 let payload_or_attrs = PayloadOrAttributes::<
169 '_,
170 PayloadT::ExecutionData,
171 PayloadT::PayloadAttributes,
172 >::from_execution_payload(&payload);
173 self.inner
174 .validator
175 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
176 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
177 }
178
179 pub async fn new_payload_v2_metered(
181 &self,
182 payload: PayloadT::ExecutionData,
183 ) -> EngineApiResult<PayloadStatus> {
184 let start = Instant::now();
185 let res = Self::new_payload_v2(self, payload).await;
186 let elapsed = start.elapsed();
187 self.inner.metrics.latency.new_payload_v2.record(elapsed);
188 res
189 }
190
191 pub async fn new_payload_v3(
193 &self,
194 payload: PayloadT::ExecutionData,
195 ) -> EngineApiResult<PayloadStatus> {
196 let payload_or_attrs = PayloadOrAttributes::<
197 '_,
198 PayloadT::ExecutionData,
199 PayloadT::PayloadAttributes,
200 >::from_execution_payload(&payload);
201 self.inner
202 .validator
203 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
204
205 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
206 }
207
208 pub async fn new_payload_v3_metered(
210 &self,
211 payload: PayloadT::ExecutionData,
212 ) -> RpcResult<PayloadStatus> {
213 let start = Instant::now();
214
215 let res = Self::new_payload_v3(self, payload).await;
216 let elapsed = start.elapsed();
217 self.inner.metrics.latency.new_payload_v3.record(elapsed);
218 Ok(res?)
219 }
220
221 pub async fn new_payload_v4(
223 &self,
224 payload: PayloadT::ExecutionData,
225 ) -> EngineApiResult<PayloadStatus> {
226 let payload_or_attrs = PayloadOrAttributes::<
227 '_,
228 PayloadT::ExecutionData,
229 PayloadT::PayloadAttributes,
230 >::from_execution_payload(&payload);
231 self.inner
232 .validator
233 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
234
235 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
236 }
237
238 pub async fn new_payload_v4_metered(
240 &self,
241 payload: PayloadT::ExecutionData,
242 ) -> RpcResult<PayloadStatus> {
243 let start = Instant::now();
244 let res = Self::new_payload_v4(self, payload).await;
245
246 let elapsed = start.elapsed();
247 self.inner.metrics.latency.new_payload_v4.record(elapsed);
248 Ok(res?)
249 }
250
251 pub fn accept_execution_requests_hash(&self) -> bool {
253 self.inner.accept_execution_requests_hash
254 }
255}
256
257impl<Provider, EngineT, Pool, Validator, ChainSpec>
258 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
259where
260 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
261 EngineT: EngineTypes,
262 Pool: TransactionPool + 'static,
263 Validator: EngineApiValidator<EngineT>,
264 ChainSpec: EthereumHardforks + Send + Sync + 'static,
265{
266 pub async fn fork_choice_updated_v1(
273 &self,
274 state: ForkchoiceState,
275 payload_attrs: Option<EngineT::PayloadAttributes>,
276 ) -> EngineApiResult<ForkchoiceUpdated> {
277 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
278 .await
279 }
280
281 pub async fn fork_choice_updated_v1_metered(
283 &self,
284 state: ForkchoiceState,
285 payload_attrs: Option<EngineT::PayloadAttributes>,
286 ) -> EngineApiResult<ForkchoiceUpdated> {
287 let start = Instant::now();
288 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
289 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
290 res
291 }
292
293 pub async fn fork_choice_updated_v2(
298 &self,
299 state: ForkchoiceState,
300 payload_attrs: Option<EngineT::PayloadAttributes>,
301 ) -> EngineApiResult<ForkchoiceUpdated> {
302 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
303 .await
304 }
305
306 pub async fn fork_choice_updated_v2_metered(
308 &self,
309 state: ForkchoiceState,
310 payload_attrs: Option<EngineT::PayloadAttributes>,
311 ) -> EngineApiResult<ForkchoiceUpdated> {
312 let start = Instant::now();
313 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
314 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
315 res
316 }
317
318 pub async fn fork_choice_updated_v3(
323 &self,
324 state: ForkchoiceState,
325 payload_attrs: Option<EngineT::PayloadAttributes>,
326 ) -> EngineApiResult<ForkchoiceUpdated> {
327 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
328 .await
329 }
330
331 pub async fn fork_choice_updated_v3_metered(
333 &self,
334 state: ForkchoiceState,
335 payload_attrs: Option<EngineT::PayloadAttributes>,
336 ) -> EngineApiResult<ForkchoiceUpdated> {
337 let start = Instant::now();
338 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
339 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
340 res
341 }
342
343 async fn get_built_payload(
345 &self,
346 payload_id: PayloadId,
347 ) -> EngineApiResult<EngineT::BuiltPayload> {
348 self.inner
349 .payload_store
350 .resolve(payload_id)
351 .await
352 .ok_or(EngineApiError::UnknownPayload)?
353 .map_err(|_| EngineApiError::UnknownPayload)
354 }
355
356 async fn get_payload_inner<R>(
359 &self,
360 payload_id: PayloadId,
361 version: EngineApiMessageVersion,
362 ) -> EngineApiResult<R>
363 where
364 EngineT::BuiltPayload: TryInto<R>,
365 {
366 let timestamp = self.get_payload_timestamp(payload_id).await?;
369 validate_payload_timestamp(
370 &self.inner.chain_spec,
371 version,
372 timestamp,
373 MessageValidationKind::GetPayload,
374 )?;
375
376 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
378 warn!(?version, "could not transform built payload");
379 EngineApiError::UnknownPayload
380 })
381 }
382
383 pub async fn get_payload_v1(
393 &self,
394 payload_id: PayloadId,
395 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
396 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
397 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
398 EngineApiError::UnknownPayload
399 })
400 }
401
402 pub async fn get_payload_v1_metered(
404 &self,
405 payload_id: PayloadId,
406 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
407 let start = Instant::now();
408 let res = Self::get_payload_v1(self, payload_id).await;
409 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
410 res
411 }
412
413 pub async fn get_payload_v2(
421 &self,
422 payload_id: PayloadId,
423 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
424 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
425 }
426
427 pub async fn get_payload_v2_metered(
429 &self,
430 payload_id: PayloadId,
431 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
432 let start = Instant::now();
433 let res = Self::get_payload_v2(self, payload_id).await;
434 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
435 res
436 }
437
438 pub async fn get_payload_v3(
446 &self,
447 payload_id: PayloadId,
448 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
449 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
450 }
451
452 pub async fn get_payload_v3_metered(
454 &self,
455 payload_id: PayloadId,
456 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
457 let start = Instant::now();
458 let res = Self::get_payload_v3(self, payload_id).await;
459 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
460 res
461 }
462
463 pub async fn get_payload_v4(
471 &self,
472 payload_id: PayloadId,
473 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
474 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
475 }
476
477 pub async fn get_payload_v4_metered(
479 &self,
480 payload_id: PayloadId,
481 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
482 let start = Instant::now();
483 let res = Self::get_payload_v4(self, payload_id).await;
484 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
485 res
486 }
487
488 pub async fn get_payload_v5(
498 &self,
499 payload_id: PayloadId,
500 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
501 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
502 }
503
504 pub async fn get_payload_v5_metered(
506 &self,
507 payload_id: PayloadId,
508 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
509 let start = Instant::now();
510 let res = Self::get_payload_v5(self, payload_id).await;
511 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
512 res
513 }
514
515 pub async fn get_payload_bodies_by_range_with<F, R>(
518 &self,
519 start: BlockNumber,
520 count: u64,
521 f: F,
522 ) -> EngineApiResult<Vec<Option<R>>>
523 where
524 F: Fn(Provider::Block) -> R + Send + 'static,
525 R: Send + 'static,
526 {
527 let (tx, rx) = oneshot::channel();
528 let inner = self.inner.clone();
529
530 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
531 if count > MAX_PAYLOAD_BODIES_LIMIT {
532 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
533 return;
534 }
535
536 if start == 0 || count == 0 {
537 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
538 return;
539 }
540
541 let mut result = Vec::with_capacity(count as usize);
542
543 let mut end = start.saturating_add(count - 1);
545
546 if let Ok(best_block) = inner.provider.best_block_number()
549 && end > best_block {
550 end = best_block;
551 }
552
553 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
555 for num in start..=end {
556 if num < earliest_block {
557 result.push(None);
558 continue;
559 }
560 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
561 match block_result {
562 Ok(block) => {
563 result.push(block.map(&f));
564 }
565 Err(err) => {
566 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
567 return;
568 }
569 };
570 }
571 tx.send(Ok(result)).ok();
572 }));
573
574 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
575 }
576
577 pub async fn get_payload_bodies_by_range_v1(
588 &self,
589 start: BlockNumber,
590 count: u64,
591 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
592 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
593 transactions: block.body().encoded_2718_transactions(),
594 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
595 })
596 .await
597 }
598
599 pub async fn get_payload_bodies_by_range_v1_metered(
601 &self,
602 start: BlockNumber,
603 count: u64,
604 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
605 let start_time = Instant::now();
606 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
607 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
608 res
609 }
610
611 pub async fn get_payload_bodies_by_hash_with<F, R>(
613 &self,
614 hashes: Vec<BlockHash>,
615 f: F,
616 ) -> EngineApiResult<Vec<Option<R>>>
617 where
618 F: Fn(Provider::Block) -> R + Send + 'static,
619 R: Send + 'static,
620 {
621 let len = hashes.len() as u64;
622 if len > MAX_PAYLOAD_BODIES_LIMIT {
623 return Err(EngineApiError::PayloadRequestTooLarge { len });
624 }
625
626 let (tx, rx) = oneshot::channel();
627 let inner = self.inner.clone();
628
629 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
630 let mut result = Vec::with_capacity(hashes.len());
631 for hash in hashes {
632 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
633 match block_result {
634 Ok(block) => {
635 result.push(block.map(&f));
636 }
637 Err(err) => {
638 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
639 return;
640 }
641 }
642 }
643 tx.send(Ok(result)).ok();
644 }));
645
646 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
647 }
648
649 pub async fn get_payload_bodies_by_hash_v1(
651 &self,
652 hashes: Vec<BlockHash>,
653 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
654 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
655 transactions: block.body().encoded_2718_transactions(),
656 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
657 })
658 .await
659 }
660
661 pub async fn get_payload_bodies_by_hash_v1_metered(
663 &self,
664 hashes: Vec<BlockHash>,
665 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
666 let start = Instant::now();
667 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
668 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
669 res
670 }
671
672 async fn validate_and_execute_forkchoice(
686 &self,
687 version: EngineApiMessageVersion,
688 state: ForkchoiceState,
689 payload_attrs: Option<EngineT::PayloadAttributes>,
690 ) -> EngineApiResult<ForkchoiceUpdated> {
691 if let Some(ref attrs) = payload_attrs {
692 let attr_validation_res =
693 self.inner.validator.ensure_well_formed_attributes(version, attrs);
694
695 if let Err(err) = attr_validation_res {
709 let fcu_res =
710 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
711 if fcu_res.is_invalid() {
714 return Ok(fcu_res)
715 }
716 return Err(err.into())
717 }
718 }
719
720 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
721 }
722
723 pub fn capabilities(&self) -> &EngineCapabilities {
725 &self.inner.capabilities
726 }
727
728 fn get_blobs_v1(
729 &self,
730 versioned_hashes: Vec<B256>,
731 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
732 let current_timestamp =
734 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
735 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
736 return Err(EngineApiError::EngineObjectValidationError(
737 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
738 ));
739 }
740
741 if versioned_hashes.len() > MAX_BLOB_LIMIT {
742 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
743 }
744
745 self.inner
746 .tx_pool
747 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
748 .map_err(|err| EngineApiError::Internal(Box::new(err)))
749 }
750
751 pub fn get_blobs_v1_metered(
753 &self,
754 versioned_hashes: Vec<B256>,
755 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
756 let hashes_len = versioned_hashes.len();
757 let start = Instant::now();
758 let res = Self::get_blobs_v1(self, versioned_hashes);
759 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
760
761 if let Ok(blobs) = &res {
762 let blobs_found = blobs.iter().flatten().count();
763 let blobs_missed = hashes_len - blobs_found;
764
765 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
766 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
767 }
768
769 res
770 }
771
772 fn get_blobs_v2(
773 &self,
774 versioned_hashes: Vec<B256>,
775 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
776 let current_timestamp =
778 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
779 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
780 return Err(EngineApiError::EngineObjectValidationError(
781 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
782 ));
783 }
784
785 if versioned_hashes.len() > MAX_BLOB_LIMIT {
786 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
787 }
788
789 self.inner
790 .tx_pool
791 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
792 .map_err(|err| EngineApiError::Internal(Box::new(err)))
793 }
794
795 pub fn get_blobs_v2_metered(
797 &self,
798 versioned_hashes: Vec<B256>,
799 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
800 let hashes_len = versioned_hashes.len();
801 let start = Instant::now();
802 let res = Self::get_blobs_v2(self, versioned_hashes);
803 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
804
805 if let Ok(blobs) = &res {
806 let blobs_found = blobs.iter().flatten().count();
807
808 self.inner
809 .metrics
810 .blob_metrics
811 .get_blobs_requests_blobs_total
812 .increment(hashes_len as u64);
813 self.inner
814 .metrics
815 .blob_metrics
816 .get_blobs_requests_blobs_in_blobpool_total
817 .increment(blobs_found as u64);
818
819 if blobs_found == hashes_len {
820 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
821 } else {
822 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
823 }
824 } else {
825 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
826 }
827
828 res
829 }
830}
831
832#[async_trait]
834impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
835 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
836where
837 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
838 EngineT: EngineTypes<ExecutionData = ExecutionData>,
839 Pool: TransactionPool + 'static,
840 Validator: EngineApiValidator<EngineT>,
841 ChainSpec: EthereumHardforks + Send + Sync + 'static,
842{
843 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
847 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
848 let payload =
849 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
850 Ok(self.new_payload_v1_metered(payload).await?)
851 }
852
853 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
856 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
857 let payload = ExecutionData {
858 payload: payload.into_payload(),
859 sidecar: ExecutionPayloadSidecar::none(),
860 };
861
862 Ok(self.new_payload_v2_metered(payload).await?)
863 }
864
865 async fn new_payload_v3(
868 &self,
869 payload: ExecutionPayloadV3,
870 versioned_hashes: Vec<B256>,
871 parent_beacon_block_root: B256,
872 ) -> RpcResult<PayloadStatus> {
873 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
874 let payload = ExecutionData {
875 payload: payload.into(),
876 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
877 versioned_hashes,
878 parent_beacon_block_root,
879 }),
880 };
881
882 Ok(self.new_payload_v3_metered(payload).await?)
883 }
884
885 async fn new_payload_v4(
888 &self,
889 payload: ExecutionPayloadV3,
890 versioned_hashes: Vec<B256>,
891 parent_beacon_block_root: B256,
892 requests: RequestsOrHash,
893 ) -> RpcResult<PayloadStatus> {
894 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
895
896 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
898 return Err(EngineApiError::UnexpectedRequestsHash.into());
899 }
900
901 let payload = ExecutionData {
902 payload: payload.into(),
903 sidecar: ExecutionPayloadSidecar::v4(
904 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
905 PraguePayloadFields { requests },
906 ),
907 };
908
909 Ok(self.new_payload_v4_metered(payload).await?)
910 }
911
912 async fn fork_choice_updated_v1(
917 &self,
918 fork_choice_state: ForkchoiceState,
919 payload_attributes: Option<EngineT::PayloadAttributes>,
920 ) -> RpcResult<ForkchoiceUpdated> {
921 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
922 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
923 }
924
925 async fn fork_choice_updated_v2(
928 &self,
929 fork_choice_state: ForkchoiceState,
930 payload_attributes: Option<EngineT::PayloadAttributes>,
931 ) -> RpcResult<ForkchoiceUpdated> {
932 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
933 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
934 }
935
936 async fn fork_choice_updated_v3(
940 &self,
941 fork_choice_state: ForkchoiceState,
942 payload_attributes: Option<EngineT::PayloadAttributes>,
943 ) -> RpcResult<ForkchoiceUpdated> {
944 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
945 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
946 }
947
948 async fn get_payload_v1(
960 &self,
961 payload_id: PayloadId,
962 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
963 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
964 Ok(self.get_payload_v1_metered(payload_id).await?)
965 }
966
967 async fn get_payload_v2(
977 &self,
978 payload_id: PayloadId,
979 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
980 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
981 Ok(self.get_payload_v2_metered(payload_id).await?)
982 }
983
984 async fn get_payload_v3(
994 &self,
995 payload_id: PayloadId,
996 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
997 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
998 Ok(self.get_payload_v3_metered(payload_id).await?)
999 }
1000
1001 async fn get_payload_v4(
1011 &self,
1012 payload_id: PayloadId,
1013 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1014 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1015 Ok(self.get_payload_v4_metered(payload_id).await?)
1016 }
1017
1018 async fn get_payload_v5(
1028 &self,
1029 payload_id: PayloadId,
1030 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1031 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1032 Ok(self.get_payload_v5_metered(payload_id).await?)
1033 }
1034
1035 async fn get_payload_bodies_by_hash_v1(
1038 &self,
1039 block_hashes: Vec<BlockHash>,
1040 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1041 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1042 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1043 }
1044
1045 async fn get_payload_bodies_by_range_v1(
1062 &self,
1063 start: U64,
1064 count: U64,
1065 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1066 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1067 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1068 }
1069
1070 async fn get_client_version_v1(
1074 &self,
1075 client: ClientVersionV1,
1076 ) -> RpcResult<Vec<ClientVersionV1>> {
1077 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1078 Ok(Self::get_client_version_v1(self, client)?)
1079 }
1080
1081 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1084 Ok(self.capabilities().list())
1085 }
1086
1087 async fn get_blobs_v1(
1088 &self,
1089 versioned_hashes: Vec<B256>,
1090 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1091 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1092 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1093 }
1094
1095 async fn get_blobs_v2(
1096 &self,
1097 versioned_hashes: Vec<B256>,
1098 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1099 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1100 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1101 }
1102}
1103
1104impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1105 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1106where
1107 EngineT: EngineTypes,
1108 Self: EngineApiServer<EngineT>,
1109{
1110 fn into_rpc_module(self) -> RpcModule<()> {
1111 self.into_rpc().remove_context()
1112 }
1113}
1114
1115impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1116 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1117where
1118 PayloadT: PayloadTypes,
1119{
1120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1121 f.debug_struct("EngineApi").finish_non_exhaustive()
1122 }
1123}
1124
1125impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1126 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1127where
1128 PayloadT: PayloadTypes,
1129{
1130 fn clone(&self) -> Self {
1131 Self { inner: Arc::clone(&self.inner) }
1132 }
1133}
1134
1135struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1137 provider: Provider,
1139 chain_spec: Arc<ChainSpec>,
1141 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1143 payload_store: PayloadStore<PayloadT>,
1145 task_spawner: Box<dyn TaskSpawner>,
1147 metrics: EngineApiMetrics,
1149 client: ClientVersionV1,
1151 capabilities: EngineCapabilities,
1153 tx_pool: Pool,
1155 validator: Validator,
1157 accept_execution_requests_hash: bool,
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 use super::*;
1163 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1164 use assert_matches::assert_matches;
1165 use reth_chainspec::{ChainSpec, MAINNET};
1166 use reth_engine_primitives::BeaconEngineMessage;
1167 use reth_ethereum_engine_primitives::EthEngineTypes;
1168 use reth_ethereum_primitives::Block;
1169 use reth_node_ethereum::EthereumEngineValidator;
1170 use reth_payload_builder::test_utils::spawn_test_payload_service;
1171 use reth_provider::test_utils::MockEthProvider;
1172 use reth_tasks::TokioTaskExecutor;
1173 use reth_transaction_pool::noop::NoopTransactionPool;
1174 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1175
1176 fn setup_engine_api() -> (
1177 EngineApiTestHandle,
1178 EngineApi<
1179 Arc<MockEthProvider>,
1180 EthEngineTypes,
1181 NoopTransactionPool,
1182 EthereumEngineValidator,
1183 ChainSpec,
1184 >,
1185 ) {
1186 let client = ClientVersionV1 {
1187 code: ClientCode::RH,
1188 name: "Reth".to_string(),
1189 version: "v0.2.0-beta.5".to_string(),
1190 commit: "defa64b2".to_string(),
1191 };
1192
1193 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1194 let provider = Arc::new(MockEthProvider::default());
1195 let payload_store = spawn_test_payload_service();
1196 let (to_engine, engine_rx) = unbounded_channel();
1197 let task_executor = Box::<TokioTaskExecutor>::default();
1198 let api = EngineApi::new(
1199 provider.clone(),
1200 chain_spec.clone(),
1201 ConsensusEngineHandle::new(to_engine),
1202 payload_store.into(),
1203 NoopTransactionPool::default(),
1204 task_executor,
1205 client,
1206 EngineCapabilities::default(),
1207 EthereumEngineValidator::new(chain_spec.clone()),
1208 false,
1209 );
1210 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1211 (handle, api)
1212 }
1213
1214 #[tokio::test]
1215 async fn engine_client_version_v1() {
1216 let client = ClientVersionV1 {
1217 code: ClientCode::RH,
1218 name: "Reth".to_string(),
1219 version: "v0.2.0-beta.5".to_string(),
1220 commit: "defa64b2".to_string(),
1221 };
1222 let (_, api) = setup_engine_api();
1223 let res = api.get_client_version_v1(client.clone());
1224 assert_eq!(res.unwrap(), vec![client]);
1225 }
1226
1227 struct EngineApiTestHandle {
1228 #[allow(dead_code)]
1229 chain_spec: Arc<ChainSpec>,
1230 provider: Arc<MockEthProvider>,
1231 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1232 }
1233
1234 #[tokio::test]
1235 async fn forwards_responses_to_consensus_engine() {
1236 let (mut handle, api) = setup_engine_api();
1237
1238 tokio::spawn(async move {
1239 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1240 let execution_data = ExecutionData {
1241 payload: payload_v1.into(),
1242 sidecar: ExecutionPayloadSidecar::none(),
1243 };
1244
1245 api.new_payload_v1(execution_data).await.unwrap();
1246 });
1247 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1248 }
1249
1250 mod get_payload_bodies {
1252 use super::*;
1253 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1254 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1255
1256 #[tokio::test]
1257 async fn invalid_params() {
1258 let (_, api) = setup_engine_api();
1259
1260 let by_range_tests = [
1261 (0, 0),
1263 (0, 1),
1264 (1, 0),
1265 ];
1266
1267 for (start, count) in by_range_tests {
1269 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1270 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1271 }
1272 }
1273
1274 #[tokio::test]
1275 async fn request_too_large() {
1276 let (_, api) = setup_engine_api();
1277
1278 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1279 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1280 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1281 }
1282
1283 #[tokio::test]
1284 async fn returns_payload_bodies() {
1285 let mut rng = generators::rng();
1286 let (handle, api) = setup_engine_api();
1287
1288 let (start, count) = (1, 10);
1289 let blocks = random_block_range(
1290 &mut rng,
1291 start..=start + count - 1,
1292 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1293 );
1294 handle
1295 .provider
1296 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1297
1298 let expected = blocks
1299 .iter()
1300 .cloned()
1301 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1302 .collect::<Vec<_>>();
1303
1304 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1305 assert_eq!(res, expected);
1306 }
1307
1308 #[tokio::test]
1309 async fn returns_payload_bodies_with_gaps() {
1310 let mut rng = generators::rng();
1311 let (handle, api) = setup_engine_api();
1312
1313 let (start, count) = (1, 100);
1314 let blocks = random_block_range(
1315 &mut rng,
1316 start..=start + count - 1,
1317 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1318 );
1319
1320 let first_missing_range = 26..=50;
1322 let second_missing_range = 76..=100;
1323 handle.provider.extend_blocks(
1324 blocks
1325 .iter()
1326 .filter(|b| {
1327 !first_missing_range.contains(&b.number) &&
1328 !second_missing_range.contains(&b.number)
1329 })
1330 .map(|b| (b.hash(), b.clone().into_block())),
1331 );
1332
1333 let expected = blocks
1334 .iter()
1335 .filter(|b| !second_missing_range.contains(&b.number))
1338 .cloned()
1339 .map(|b| {
1340 if first_missing_range.contains(&b.number) {
1341 None
1342 } else {
1343 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1344 }
1345 })
1346 .collect::<Vec<_>>();
1347
1348 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1349 assert_eq!(res, expected);
1350
1351 let expected = blocks
1352 .iter()
1353 .cloned()
1354 .map(|b| {
1357 if first_missing_range.contains(&b.number) ||
1358 second_missing_range.contains(&b.number)
1359 {
1360 None
1361 } else {
1362 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1363 }
1364 })
1365 .collect::<Vec<_>>();
1366
1367 let hashes = blocks.iter().map(|b| b.hash()).collect();
1368 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1369 assert_eq!(res, expected);
1370 }
1371 }
1372}