1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24 validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload, PayloadOrAttributes,
25 PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{sync::Arc, time::Instant};
33use tokio::sync::oneshot;
34use tracing::{debug, trace, warn};
35
36pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
38
39const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
41
42const MAX_BLOB_LIMIT: usize = 128;
44
45pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
61 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
62}
63
64impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
65 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
66{
67 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
69 &self.inner.chain_spec
70 }
71}
72
73impl<Provider, PayloadT, Pool, Validator, ChainSpec>
74 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
75where
76 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
77 PayloadT: PayloadTypes,
78 Pool: TransactionPool + 'static,
79 Validator: EngineApiValidator<PayloadT>,
80 ChainSpec: EthereumHardforks + Send + Sync + 'static,
81{
82 #[expect(clippy::too_many_arguments)]
84 pub fn new(
85 provider: Provider,
86 chain_spec: Arc<ChainSpec>,
87 beacon_consensus: ConsensusEngineHandle<PayloadT>,
88 payload_store: PayloadStore<PayloadT>,
89 tx_pool: Pool,
90 task_spawner: Box<dyn TaskSpawner>,
91 client: ClientVersionV1,
92 capabilities: EngineCapabilities,
93 validator: Validator,
94 accept_execution_requests_hash: bool,
95 ) -> Self {
96 let inner = Arc::new(EngineApiInner {
97 provider,
98 chain_spec,
99 beacon_consensus,
100 payload_store,
101 task_spawner,
102 metrics: EngineApiMetrics::default(),
103 client,
104 capabilities,
105 tx_pool,
106 validator,
107 latest_new_payload_response: Mutex::new(None),
108 accept_execution_requests_hash,
109 });
110 Self { inner }
111 }
112
113 pub fn get_client_version_v1(
115 &self,
116 _client: ClientVersionV1,
117 ) -> EngineApiResult<Vec<ClientVersionV1>> {
118 Ok(vec![self.inner.client.clone()])
119 }
120
121 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
123 Ok(self
124 .inner
125 .payload_store
126 .payload_timestamp(payload_id)
127 .await
128 .ok_or(EngineApiError::UnknownPayload)??)
129 }
130
131 pub async fn new_payload_v1(
134 &self,
135 payload: PayloadT::ExecutionData,
136 ) -> EngineApiResult<PayloadStatus> {
137 let payload_or_attrs = PayloadOrAttributes::<
138 '_,
139 PayloadT::ExecutionData,
140 PayloadT::PayloadAttributes,
141 >::from_execution_payload(&payload);
142
143 self.inner
144 .validator
145 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
146
147 Ok(self
148 .inner
149 .beacon_consensus
150 .new_payload(payload)
151 .await
152 .inspect(|_| self.inner.on_new_payload_response())?)
153 }
154
155 pub async fn new_payload_v1_metered(
157 &self,
158 payload: PayloadT::ExecutionData,
159 ) -> EngineApiResult<PayloadStatus> {
160 let start = Instant::now();
161 let gas_used = payload.gas_used();
162
163 let res = Self::new_payload_v1(self, payload).await;
164 let elapsed = start.elapsed();
165 self.inner.metrics.latency.new_payload_v1.record(elapsed);
166 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
167 res
168 }
169
170 pub async fn new_payload_v2(
172 &self,
173 payload: PayloadT::ExecutionData,
174 ) -> EngineApiResult<PayloadStatus> {
175 let payload_or_attrs = PayloadOrAttributes::<
176 '_,
177 PayloadT::ExecutionData,
178 PayloadT::PayloadAttributes,
179 >::from_execution_payload(&payload);
180 self.inner
181 .validator
182 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
183 Ok(self
184 .inner
185 .beacon_consensus
186 .new_payload(payload)
187 .await
188 .inspect(|_| self.inner.on_new_payload_response())?)
189 }
190
191 pub async fn new_payload_v2_metered(
193 &self,
194 payload: PayloadT::ExecutionData,
195 ) -> EngineApiResult<PayloadStatus> {
196 let start = Instant::now();
197 let gas_used = payload.gas_used();
198
199 let res = Self::new_payload_v2(self, payload).await;
200 let elapsed = start.elapsed();
201 self.inner.metrics.latency.new_payload_v2.record(elapsed);
202 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
203 res
204 }
205
206 pub async fn new_payload_v3(
208 &self,
209 payload: PayloadT::ExecutionData,
210 ) -> EngineApiResult<PayloadStatus> {
211 let payload_or_attrs = PayloadOrAttributes::<
212 '_,
213 PayloadT::ExecutionData,
214 PayloadT::PayloadAttributes,
215 >::from_execution_payload(&payload);
216 self.inner
217 .validator
218 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
219
220 Ok(self
221 .inner
222 .beacon_consensus
223 .new_payload(payload)
224 .await
225 .inspect(|_| self.inner.on_new_payload_response())?)
226 }
227
228 pub async fn new_payload_v3_metered(
230 &self,
231 payload: PayloadT::ExecutionData,
232 ) -> RpcResult<PayloadStatus> {
233 let start = Instant::now();
234 let gas_used = payload.gas_used();
235
236 let res = Self::new_payload_v3(self, payload).await;
237 let elapsed = start.elapsed();
238 self.inner.metrics.latency.new_payload_v3.record(elapsed);
239 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
240 Ok(res?)
241 }
242
243 pub async fn new_payload_v4(
245 &self,
246 payload: PayloadT::ExecutionData,
247 ) -> EngineApiResult<PayloadStatus> {
248 let payload_or_attrs = PayloadOrAttributes::<
249 '_,
250 PayloadT::ExecutionData,
251 PayloadT::PayloadAttributes,
252 >::from_execution_payload(&payload);
253 self.inner
254 .validator
255 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
256
257 Ok(self
258 .inner
259 .beacon_consensus
260 .new_payload(payload)
261 .await
262 .inspect(|_| self.inner.on_new_payload_response())?)
263 }
264
265 pub async fn new_payload_v4_metered(
267 &self,
268 payload: PayloadT::ExecutionData,
269 ) -> RpcResult<PayloadStatus> {
270 let start = Instant::now();
271 let gas_used = payload.gas_used();
272
273 let res = Self::new_payload_v4(self, payload).await;
274
275 let elapsed = start.elapsed();
276 self.inner.metrics.latency.new_payload_v4.record(elapsed);
277 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
278 Ok(res?)
279 }
280
281 pub fn accept_execution_requests_hash(&self) -> bool {
283 self.inner.accept_execution_requests_hash
284 }
285}
286
287impl<Provider, EngineT, Pool, Validator, ChainSpec>
288 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
289where
290 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
291 EngineT: EngineTypes,
292 Pool: TransactionPool + 'static,
293 Validator: EngineApiValidator<EngineT>,
294 ChainSpec: EthereumHardforks + Send + Sync + 'static,
295{
296 pub async fn fork_choice_updated_v1(
303 &self,
304 state: ForkchoiceState,
305 payload_attrs: Option<EngineT::PayloadAttributes>,
306 ) -> EngineApiResult<ForkchoiceUpdated> {
307 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
308 .await
309 }
310
311 pub async fn fork_choice_updated_v1_metered(
313 &self,
314 state: ForkchoiceState,
315 payload_attrs: Option<EngineT::PayloadAttributes>,
316 ) -> EngineApiResult<ForkchoiceUpdated> {
317 let start = Instant::now();
318 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
319 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
320 self.inner.metrics.fcu_response.update_response_metrics(&res);
321 res
322 }
323
324 pub async fn fork_choice_updated_v2(
329 &self,
330 state: ForkchoiceState,
331 payload_attrs: Option<EngineT::PayloadAttributes>,
332 ) -> EngineApiResult<ForkchoiceUpdated> {
333 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
334 .await
335 }
336
337 pub async fn fork_choice_updated_v2_metered(
339 &self,
340 state: ForkchoiceState,
341 payload_attrs: Option<EngineT::PayloadAttributes>,
342 ) -> EngineApiResult<ForkchoiceUpdated> {
343 let start = Instant::now();
344 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
345 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
346 self.inner.metrics.fcu_response.update_response_metrics(&res);
347 res
348 }
349
350 pub async fn fork_choice_updated_v3(
355 &self,
356 state: ForkchoiceState,
357 payload_attrs: Option<EngineT::PayloadAttributes>,
358 ) -> EngineApiResult<ForkchoiceUpdated> {
359 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
360 .await
361 }
362
363 pub async fn fork_choice_updated_v3_metered(
365 &self,
366 state: ForkchoiceState,
367 payload_attrs: Option<EngineT::PayloadAttributes>,
368 ) -> EngineApiResult<ForkchoiceUpdated> {
369 let start = Instant::now();
370 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
371 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
372 self.inner.metrics.fcu_response.update_response_metrics(&res);
373 res
374 }
375
376 async fn get_built_payload(
378 &self,
379 payload_id: PayloadId,
380 ) -> EngineApiResult<EngineT::BuiltPayload> {
381 self.inner
382 .payload_store
383 .resolve(payload_id)
384 .await
385 .ok_or(EngineApiError::UnknownPayload)?
386 .map_err(|_| EngineApiError::UnknownPayload)
387 }
388
389 async fn get_payload_inner<R>(
392 &self,
393 payload_id: PayloadId,
394 version: EngineApiMessageVersion,
395 ) -> EngineApiResult<R>
396 where
397 EngineT::BuiltPayload: TryInto<R>,
398 {
399 let timestamp = self.get_payload_timestamp(payload_id).await?;
401 validate_payload_timestamp(&self.inner.chain_spec, version, timestamp)?;
402
403 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
405 warn!(?version, "could not transform built payload");
406 EngineApiError::UnknownPayload
407 })
408 }
409
410 pub async fn get_payload_v1(
420 &self,
421 payload_id: PayloadId,
422 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
423 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
424 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
425 EngineApiError::UnknownPayload
426 })
427 }
428
429 pub async fn get_payload_v1_metered(
431 &self,
432 payload_id: PayloadId,
433 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
434 let start = Instant::now();
435 let res = Self::get_payload_v1(self, payload_id).await;
436 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
437 res
438 }
439
440 pub async fn get_payload_v2(
448 &self,
449 payload_id: PayloadId,
450 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
451 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
452 }
453
454 pub async fn get_payload_v2_metered(
456 &self,
457 payload_id: PayloadId,
458 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
459 let start = Instant::now();
460 let res = Self::get_payload_v2(self, payload_id).await;
461 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
462 res
463 }
464
465 pub async fn get_payload_v3(
473 &self,
474 payload_id: PayloadId,
475 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
476 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
477 }
478
479 pub async fn get_payload_v3_metered(
481 &self,
482 payload_id: PayloadId,
483 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
484 let start = Instant::now();
485 let res = Self::get_payload_v3(self, payload_id).await;
486 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
487 res
488 }
489
490 pub async fn get_payload_v4(
498 &self,
499 payload_id: PayloadId,
500 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
501 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
502 }
503
504 pub async fn get_payload_v4_metered(
506 &self,
507 payload_id: PayloadId,
508 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
509 let start = Instant::now();
510 let res = Self::get_payload_v4(self, payload_id).await;
511 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
512 res
513 }
514
515 pub async fn get_payload_v5(
525 &self,
526 payload_id: PayloadId,
527 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
528 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
529 }
530
531 pub async fn get_payload_v5_metered(
533 &self,
534 payload_id: PayloadId,
535 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
536 let start = Instant::now();
537 let res = Self::get_payload_v5(self, payload_id).await;
538 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
539 res
540 }
541
542 pub async fn get_payload_bodies_by_range_with<F, R>(
545 &self,
546 start: BlockNumber,
547 count: u64,
548 f: F,
549 ) -> EngineApiResult<Vec<Option<R>>>
550 where
551 F: Fn(Provider::Block) -> R + Send + 'static,
552 R: Send + 'static,
553 {
554 let (tx, rx) = oneshot::channel();
555 let inner = self.inner.clone();
556
557 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
558 if count > MAX_PAYLOAD_BODIES_LIMIT {
559 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
560 return;
561 }
562
563 if start == 0 || count == 0 {
564 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
565 return;
566 }
567
568 let mut result = Vec::with_capacity(count as usize);
569
570 let mut end = start.saturating_add(count - 1);
572
573 if let Ok(best_block) = inner.provider.best_block_number() {
576 if end > best_block {
577 end = best_block;
578 }
579 }
580
581 for num in start..=end {
582 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
583 match block_result {
584 Ok(block) => {
585 result.push(block.map(&f));
586 }
587 Err(err) => {
588 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
589 return;
590 }
591 };
592 }
593 tx.send(Ok(result)).ok();
594 }));
595
596 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
597 }
598
599 pub async fn get_payload_bodies_by_range_v1(
610 &self,
611 start: BlockNumber,
612 count: u64,
613 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
614 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
615 transactions: block.body().encoded_2718_transactions(),
616 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
617 })
618 .await
619 }
620
621 pub async fn get_payload_bodies_by_range_v1_metered(
623 &self,
624 start: BlockNumber,
625 count: u64,
626 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
627 let start_time = Instant::now();
628 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
629 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
630 res
631 }
632
633 pub async fn get_payload_bodies_by_hash_with<F, R>(
635 &self,
636 hashes: Vec<BlockHash>,
637 f: F,
638 ) -> EngineApiResult<Vec<Option<R>>>
639 where
640 F: Fn(Provider::Block) -> R + Send + 'static,
641 R: Send + 'static,
642 {
643 let len = hashes.len() as u64;
644 if len > MAX_PAYLOAD_BODIES_LIMIT {
645 return Err(EngineApiError::PayloadRequestTooLarge { len });
646 }
647
648 let (tx, rx) = oneshot::channel();
649 let inner = self.inner.clone();
650
651 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
652 let mut result = Vec::with_capacity(hashes.len());
653 for hash in hashes {
654 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
655 match block_result {
656 Ok(block) => {
657 result.push(block.map(&f));
658 }
659 Err(err) => {
660 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
661 return;
662 }
663 }
664 }
665 tx.send(Ok(result)).ok();
666 }));
667
668 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
669 }
670
671 pub async fn get_payload_bodies_by_hash_v1(
673 &self,
674 hashes: Vec<BlockHash>,
675 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
676 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
677 transactions: block.body().encoded_2718_transactions(),
678 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
679 })
680 .await
681 }
682
683 pub async fn get_payload_bodies_by_hash_v1_metered(
685 &self,
686 hashes: Vec<BlockHash>,
687 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
688 let start = Instant::now();
689 let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
690 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
691 res.await
692 }
693
694 async fn validate_and_execute_forkchoice(
708 &self,
709 version: EngineApiMessageVersion,
710 state: ForkchoiceState,
711 payload_attrs: Option<EngineT::PayloadAttributes>,
712 ) -> EngineApiResult<ForkchoiceUpdated> {
713 self.inner.record_elapsed_time_on_fcu();
714
715 if let Some(ref attrs) = payload_attrs {
716 let attr_validation_res =
717 self.inner.validator.ensure_well_formed_attributes(version, attrs);
718
719 if let Err(err) = attr_validation_res {
733 let fcu_res =
734 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
735 if fcu_res.is_invalid() {
738 return Ok(fcu_res)
739 }
740 return Err(err.into())
741 }
742 }
743
744 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
745 }
746
747 pub fn capabilities(&self) -> &EngineCapabilities {
749 &self.inner.capabilities
750 }
751
752 fn get_blobs_v1(
753 &self,
754 versioned_hashes: Vec<B256>,
755 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
756 if versioned_hashes.len() > MAX_BLOB_LIMIT {
757 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
758 }
759
760 self.inner
761 .tx_pool
762 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
763 .map_err(|err| EngineApiError::Internal(Box::new(err)))
764 }
765
766 pub fn get_blobs_v1_metered(
768 &self,
769 versioned_hashes: Vec<B256>,
770 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
771 let hashes_len = versioned_hashes.len();
772 let start = Instant::now();
773 let res = Self::get_blobs_v1(self, versioned_hashes);
774 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
775
776 if let Ok(blobs) = &res {
777 let blobs_found = blobs.iter().flatten().count();
778 let blobs_missed = hashes_len - blobs_found;
779
780 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
781 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
782 }
783
784 res
785 }
786
787 fn get_blobs_v2(
788 &self,
789 versioned_hashes: Vec<B256>,
790 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
791 if versioned_hashes.len() > MAX_BLOB_LIMIT {
792 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
793 }
794
795 self.inner
796 .tx_pool
797 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
798 .map_err(|err| EngineApiError::Internal(Box::new(err)))
799 }
800
801 pub fn get_blobs_v2_metered(
803 &self,
804 versioned_hashes: Vec<B256>,
805 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
806 let hashes_len = versioned_hashes.len();
807 let start = Instant::now();
808 let res = Self::get_blobs_v2(self, versioned_hashes);
809 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
810
811 if let Ok(blobs) = &res {
812 let blobs_found = blobs.iter().flatten().count();
813
814 self.inner
815 .metrics
816 .blob_metrics
817 .get_blobs_requests_blobs_total
818 .increment(hashes_len as u64);
819 self.inner
820 .metrics
821 .blob_metrics
822 .get_blobs_requests_blobs_in_blobpool_total
823 .increment(blobs_found as u64);
824
825 if blobs_found == hashes_len {
826 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
827 } else {
828 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
829 }
830 } else {
831 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
832 }
833
834 res
835 }
836}
837
838#[async_trait]
840impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
841 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
842where
843 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
844 EngineT: EngineTypes<ExecutionData = ExecutionData>,
845 Pool: TransactionPool + 'static,
846 Validator: EngineApiValidator<EngineT>,
847 ChainSpec: EthereumHardforks + Send + Sync + 'static,
848{
849 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
853 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
854 let payload =
855 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
856 Ok(self.new_payload_v1_metered(payload).await?)
857 }
858
859 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
862 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
863 let payload = ExecutionData {
864 payload: payload.into_payload(),
865 sidecar: ExecutionPayloadSidecar::none(),
866 };
867
868 Ok(self.new_payload_v2_metered(payload).await?)
869 }
870
871 async fn new_payload_v3(
874 &self,
875 payload: ExecutionPayloadV3,
876 versioned_hashes: Vec<B256>,
877 parent_beacon_block_root: B256,
878 ) -> RpcResult<PayloadStatus> {
879 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
880 let payload = ExecutionData {
881 payload: payload.into(),
882 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
883 versioned_hashes,
884 parent_beacon_block_root,
885 }),
886 };
887
888 Ok(self.new_payload_v3_metered(payload).await?)
889 }
890
891 async fn new_payload_v4(
894 &self,
895 payload: ExecutionPayloadV3,
896 versioned_hashes: Vec<B256>,
897 parent_beacon_block_root: B256,
898 requests: RequestsOrHash,
899 ) -> RpcResult<PayloadStatus> {
900 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
901
902 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
904 return Err(EngineApiError::UnexpectedRequestsHash.into());
905 }
906
907 let payload = ExecutionData {
908 payload: payload.into(),
909 sidecar: ExecutionPayloadSidecar::v4(
910 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
911 PraguePayloadFields { requests },
912 ),
913 };
914
915 Ok(self.new_payload_v4_metered(payload).await?)
916 }
917
918 async fn fork_choice_updated_v1(
923 &self,
924 fork_choice_state: ForkchoiceState,
925 payload_attributes: Option<EngineT::PayloadAttributes>,
926 ) -> RpcResult<ForkchoiceUpdated> {
927 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
928 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
929 }
930
931 async fn fork_choice_updated_v2(
934 &self,
935 fork_choice_state: ForkchoiceState,
936 payload_attributes: Option<EngineT::PayloadAttributes>,
937 ) -> RpcResult<ForkchoiceUpdated> {
938 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
939 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
940 }
941
942 async fn fork_choice_updated_v3(
946 &self,
947 fork_choice_state: ForkchoiceState,
948 payload_attributes: Option<EngineT::PayloadAttributes>,
949 ) -> RpcResult<ForkchoiceUpdated> {
950 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
951 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
952 }
953
954 async fn get_payload_v1(
966 &self,
967 payload_id: PayloadId,
968 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
969 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
970 Ok(self.get_payload_v1_metered(payload_id).await?)
971 }
972
973 async fn get_payload_v2(
983 &self,
984 payload_id: PayloadId,
985 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
986 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
987 Ok(self.get_payload_v2_metered(payload_id).await?)
988 }
989
990 async fn get_payload_v3(
1000 &self,
1001 payload_id: PayloadId,
1002 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1003 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1004 Ok(self.get_payload_v3_metered(payload_id).await?)
1005 }
1006
1007 async fn get_payload_v4(
1017 &self,
1018 payload_id: PayloadId,
1019 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1020 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1021 Ok(self.get_payload_v4_metered(payload_id).await?)
1022 }
1023
1024 async fn get_payload_v5(
1034 &self,
1035 payload_id: PayloadId,
1036 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1037 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1038 Ok(self.get_payload_v5_metered(payload_id).await?)
1039 }
1040
1041 async fn get_payload_bodies_by_hash_v1(
1044 &self,
1045 block_hashes: Vec<BlockHash>,
1046 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1047 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1048 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1049 }
1050
1051 async fn get_payload_bodies_by_range_v1(
1068 &self,
1069 start: U64,
1070 count: U64,
1071 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1072 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1073 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1074 }
1075
1076 async fn get_client_version_v1(
1080 &self,
1081 client: ClientVersionV1,
1082 ) -> RpcResult<Vec<ClientVersionV1>> {
1083 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1084 Ok(Self::get_client_version_v1(self, client)?)
1085 }
1086
1087 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1090 Ok(self.capabilities().list())
1091 }
1092
1093 async fn get_blobs_v1(
1094 &self,
1095 versioned_hashes: Vec<B256>,
1096 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1097 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1098 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1099 }
1100
1101 async fn get_blobs_v2(
1102 &self,
1103 versioned_hashes: Vec<B256>,
1104 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1105 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1106 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1107 }
1108}
1109
1110impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1111 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1112where
1113 EngineT: EngineTypes,
1114 Self: EngineApiServer<EngineT>,
1115{
1116 fn into_rpc_module(self) -> RpcModule<()> {
1117 self.into_rpc().remove_context()
1118 }
1119}
1120
1121impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1122 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1123where
1124 PayloadT: PayloadTypes,
1125{
1126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1127 f.debug_struct("EngineApi").finish_non_exhaustive()
1128 }
1129}
1130
1131impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1132 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1133where
1134 PayloadT: PayloadTypes,
1135{
1136 fn clone(&self) -> Self {
1137 Self { inner: Arc::clone(&self.inner) }
1138 }
1139}
1140
1141struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1143 provider: Provider,
1145 chain_spec: Arc<ChainSpec>,
1147 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1149 payload_store: PayloadStore<PayloadT>,
1151 task_spawner: Box<dyn TaskSpawner>,
1153 metrics: EngineApiMetrics,
1155 client: ClientVersionV1,
1157 capabilities: EngineCapabilities,
1159 tx_pool: Pool,
1161 validator: Validator,
1163 latest_new_payload_response: Mutex<Option<Instant>>,
1165 accept_execution_requests_hash: bool,
1166}
1167
1168impl<Provider, PayloadT, Pool, Validator, ChainSpec>
1169 EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
1170where
1171 PayloadT: PayloadTypes,
1172{
1173 fn record_elapsed_time_on_fcu(&self) {
1176 if let Some(start_time) = self.latest_new_payload_response.lock().take() {
1177 let elapsed_time = start_time.elapsed();
1178 self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
1179 }
1180 }
1181
1182 fn on_new_payload_response(&self) {
1184 self.latest_new_payload_response.lock().replace(Instant::now());
1185 }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190 use super::*;
1191 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1192 use assert_matches::assert_matches;
1193 use reth_chainspec::{ChainSpec, MAINNET};
1194 use reth_engine_primitives::BeaconEngineMessage;
1195 use reth_ethereum_engine_primitives::EthEngineTypes;
1196 use reth_ethereum_primitives::Block;
1197 use reth_node_ethereum::EthereumEngineValidator;
1198 use reth_payload_builder::test_utils::spawn_test_payload_service;
1199 use reth_provider::test_utils::MockEthProvider;
1200 use reth_tasks::TokioTaskExecutor;
1201 use reth_transaction_pool::noop::NoopTransactionPool;
1202 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1203
1204 fn setup_engine_api() -> (
1205 EngineApiTestHandle,
1206 EngineApi<
1207 Arc<MockEthProvider>,
1208 EthEngineTypes,
1209 NoopTransactionPool,
1210 EthereumEngineValidator,
1211 ChainSpec,
1212 >,
1213 ) {
1214 let client = ClientVersionV1 {
1215 code: ClientCode::RH,
1216 name: "Reth".to_string(),
1217 version: "v0.2.0-beta.5".to_string(),
1218 commit: "defa64b2".to_string(),
1219 };
1220
1221 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1222 let provider = Arc::new(MockEthProvider::default());
1223 let payload_store = spawn_test_payload_service();
1224 let (to_engine, engine_rx) = unbounded_channel();
1225 let task_executor = Box::<TokioTaskExecutor>::default();
1226 let api = EngineApi::new(
1227 provider.clone(),
1228 chain_spec.clone(),
1229 ConsensusEngineHandle::new(to_engine),
1230 payload_store.into(),
1231 NoopTransactionPool::default(),
1232 task_executor,
1233 client,
1234 EngineCapabilities::default(),
1235 EthereumEngineValidator::new(chain_spec.clone()),
1236 false,
1237 );
1238 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1239 (handle, api)
1240 }
1241
1242 #[tokio::test]
1243 async fn engine_client_version_v1() {
1244 let client = ClientVersionV1 {
1245 code: ClientCode::RH,
1246 name: "Reth".to_string(),
1247 version: "v0.2.0-beta.5".to_string(),
1248 commit: "defa64b2".to_string(),
1249 };
1250 let (_, api) = setup_engine_api();
1251 let res = api.get_client_version_v1(client.clone());
1252 assert_eq!(res.unwrap(), vec![client]);
1253 }
1254
1255 struct EngineApiTestHandle {
1256 #[allow(dead_code)]
1257 chain_spec: Arc<ChainSpec>,
1258 provider: Arc<MockEthProvider>,
1259 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1260 }
1261
1262 #[tokio::test]
1263 async fn forwards_responses_to_consensus_engine() {
1264 let (mut handle, api) = setup_engine_api();
1265
1266 tokio::spawn(async move {
1267 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1268 let execution_data = ExecutionData {
1269 payload: payload_v1.into(),
1270 sidecar: ExecutionPayloadSidecar::none(),
1271 };
1272
1273 api.new_payload_v1(execution_data).await.unwrap();
1274 });
1275 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1276 }
1277
1278 mod get_payload_bodies {
1280 use super::*;
1281 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1282 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1283
1284 #[tokio::test]
1285 async fn invalid_params() {
1286 let (_, api) = setup_engine_api();
1287
1288 let by_range_tests = [
1289 (0, 0),
1291 (0, 1),
1292 (1, 0),
1293 ];
1294
1295 for (start, count) in by_range_tests {
1297 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1298 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1299 }
1300 }
1301
1302 #[tokio::test]
1303 async fn request_too_large() {
1304 let (_, api) = setup_engine_api();
1305
1306 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1307 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1308 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1309 }
1310
1311 #[tokio::test]
1312 async fn returns_payload_bodies() {
1313 let mut rng = generators::rng();
1314 let (handle, api) = setup_engine_api();
1315
1316 let (start, count) = (1, 10);
1317 let blocks = random_block_range(
1318 &mut rng,
1319 start..=start + count - 1,
1320 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1321 );
1322 handle
1323 .provider
1324 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1325
1326 let expected = blocks
1327 .iter()
1328 .cloned()
1329 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1330 .collect::<Vec<_>>();
1331
1332 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1333 assert_eq!(res, expected);
1334 }
1335
1336 #[tokio::test]
1337 async fn returns_payload_bodies_with_gaps() {
1338 let mut rng = generators::rng();
1339 let (handle, api) = setup_engine_api();
1340
1341 let (start, count) = (1, 100);
1342 let blocks = random_block_range(
1343 &mut rng,
1344 start..=start + count - 1,
1345 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1346 );
1347
1348 let first_missing_range = 26..=50;
1350 let second_missing_range = 76..=100;
1351 handle.provider.extend_blocks(
1352 blocks
1353 .iter()
1354 .filter(|b| {
1355 !first_missing_range.contains(&b.number) &&
1356 !second_missing_range.contains(&b.number)
1357 })
1358 .map(|b| (b.hash(), b.clone().into_block())),
1359 );
1360
1361 let expected = blocks
1362 .iter()
1363 .filter(|b| !second_missing_range.contains(&b.number))
1366 .cloned()
1367 .map(|b| {
1368 if first_missing_range.contains(&b.number) {
1369 None
1370 } else {
1371 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1372 }
1373 })
1374 .collect::<Vec<_>>();
1375
1376 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1377 assert_eq!(res, expected);
1378
1379 let expected = blocks
1380 .iter()
1381 .cloned()
1382 .map(|b| {
1385 if first_missing_range.contains(&b.number) ||
1386 second_missing_range.contains(&b.number)
1387 {
1388 None
1389 } else {
1390 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1391 }
1392 })
1393 .collect::<Vec<_>>();
1394
1395 let hashes = blocks.iter().map(|b| b.hash()).collect();
1396 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1397 assert_eq!(res, expected);
1398 }
1399 }
1400}