1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use reth_chainspec::EthereumHardforks;
20use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
21use reth_network_api::NetworkInfo;
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
25 PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{
33 sync::Arc,
34 time::{Instant, SystemTime},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace, warn};
38
39pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
41
42const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
44
45const MAX_BLOB_LIMIT: usize = 128;
47
48pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
64 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
65}
66
67impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
68 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
69{
70 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
72 &self.inner.chain_spec
73 }
74}
75
76impl<Provider, PayloadT, Pool, Validator, ChainSpec>
77 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
78where
79 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
80 PayloadT: PayloadTypes,
81 Pool: TransactionPool + 'static,
82 Validator: EngineApiValidator<PayloadT>,
83 ChainSpec: EthereumHardforks + Send + Sync + 'static,
84{
85 #[expect(clippy::too_many_arguments)]
87 pub fn new(
88 provider: Provider,
89 chain_spec: Arc<ChainSpec>,
90 beacon_consensus: ConsensusEngineHandle<PayloadT>,
91 payload_store: PayloadStore<PayloadT>,
92 tx_pool: Pool,
93 task_spawner: Box<dyn TaskSpawner>,
94 client: ClientVersionV1,
95 capabilities: EngineCapabilities,
96 validator: Validator,
97 accept_execution_requests_hash: bool,
98 network: impl NetworkInfo + 'static,
99 ) -> Self {
100 let is_syncing = Arc::new(move || network.is_syncing());
101 let inner = Arc::new(EngineApiInner {
102 provider,
103 chain_spec,
104 beacon_consensus,
105 payload_store,
106 task_spawner,
107 metrics: EngineApiMetrics::default(),
108 client,
109 capabilities,
110 tx_pool,
111 validator,
112 accept_execution_requests_hash,
113 is_syncing,
114 });
115 Self { inner }
116 }
117
118 pub fn get_client_version_v1(
120 &self,
121 _client: ClientVersionV1,
122 ) -> EngineApiResult<Vec<ClientVersionV1>> {
123 Ok(vec![self.inner.client.clone()])
124 }
125
126 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
128 Ok(self
129 .inner
130 .payload_store
131 .payload_timestamp(payload_id)
132 .await
133 .ok_or(EngineApiError::UnknownPayload)??)
134 }
135
136 pub async fn new_payload_v1(
139 &self,
140 payload: PayloadT::ExecutionData,
141 ) -> EngineApiResult<PayloadStatus> {
142 let payload_or_attrs = PayloadOrAttributes::<
143 '_,
144 PayloadT::ExecutionData,
145 PayloadT::PayloadAttributes,
146 >::from_execution_payload(&payload);
147
148 self.inner
149 .validator
150 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
151
152 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
153 }
154
155 pub async fn new_payload_v1_metered(
157 &self,
158 payload: PayloadT::ExecutionData,
159 ) -> EngineApiResult<PayloadStatus> {
160 let start = Instant::now();
161 let res = Self::new_payload_v1(self, payload).await;
162 let elapsed = start.elapsed();
163 self.inner.metrics.latency.new_payload_v1.record(elapsed);
164 res
165 }
166
167 pub async fn new_payload_v2(
169 &self,
170 payload: PayloadT::ExecutionData,
171 ) -> EngineApiResult<PayloadStatus> {
172 let payload_or_attrs = PayloadOrAttributes::<
173 '_,
174 PayloadT::ExecutionData,
175 PayloadT::PayloadAttributes,
176 >::from_execution_payload(&payload);
177 self.inner
178 .validator
179 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
180 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
181 }
182
183 pub async fn new_payload_v2_metered(
185 &self,
186 payload: PayloadT::ExecutionData,
187 ) -> EngineApiResult<PayloadStatus> {
188 let start = Instant::now();
189 let res = Self::new_payload_v2(self, payload).await;
190 let elapsed = start.elapsed();
191 self.inner.metrics.latency.new_payload_v2.record(elapsed);
192 res
193 }
194
195 pub async fn new_payload_v3(
197 &self,
198 payload: PayloadT::ExecutionData,
199 ) -> EngineApiResult<PayloadStatus> {
200 let payload_or_attrs = PayloadOrAttributes::<
201 '_,
202 PayloadT::ExecutionData,
203 PayloadT::PayloadAttributes,
204 >::from_execution_payload(&payload);
205 self.inner
206 .validator
207 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
208
209 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
210 }
211
212 pub async fn new_payload_v3_metered(
214 &self,
215 payload: PayloadT::ExecutionData,
216 ) -> RpcResult<PayloadStatus> {
217 let start = Instant::now();
218
219 let res = Self::new_payload_v3(self, payload).await;
220 let elapsed = start.elapsed();
221 self.inner.metrics.latency.new_payload_v3.record(elapsed);
222 Ok(res?)
223 }
224
225 pub async fn new_payload_v4(
227 &self,
228 payload: PayloadT::ExecutionData,
229 ) -> EngineApiResult<PayloadStatus> {
230 let payload_or_attrs = PayloadOrAttributes::<
231 '_,
232 PayloadT::ExecutionData,
233 PayloadT::PayloadAttributes,
234 >::from_execution_payload(&payload);
235 self.inner
236 .validator
237 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
238
239 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
240 }
241
242 pub async fn new_payload_v4_metered(
244 &self,
245 payload: PayloadT::ExecutionData,
246 ) -> RpcResult<PayloadStatus> {
247 let start = Instant::now();
248 let res = Self::new_payload_v4(self, payload).await;
249
250 let elapsed = start.elapsed();
251 self.inner.metrics.latency.new_payload_v4.record(elapsed);
252 Ok(res?)
253 }
254
255 pub fn accept_execution_requests_hash(&self) -> bool {
257 self.inner.accept_execution_requests_hash
258 }
259}
260
261impl<Provider, EngineT, Pool, Validator, ChainSpec>
262 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
263where
264 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
265 EngineT: EngineTypes,
266 Pool: TransactionPool + 'static,
267 Validator: EngineApiValidator<EngineT>,
268 ChainSpec: EthereumHardforks + Send + Sync + 'static,
269{
270 pub async fn fork_choice_updated_v1(
277 &self,
278 state: ForkchoiceState,
279 payload_attrs: Option<EngineT::PayloadAttributes>,
280 ) -> EngineApiResult<ForkchoiceUpdated> {
281 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
282 .await
283 }
284
285 pub async fn fork_choice_updated_v1_metered(
287 &self,
288 state: ForkchoiceState,
289 payload_attrs: Option<EngineT::PayloadAttributes>,
290 ) -> EngineApiResult<ForkchoiceUpdated> {
291 let start = Instant::now();
292 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
293 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
294 res
295 }
296
297 pub async fn fork_choice_updated_v2(
302 &self,
303 state: ForkchoiceState,
304 payload_attrs: Option<EngineT::PayloadAttributes>,
305 ) -> EngineApiResult<ForkchoiceUpdated> {
306 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
307 .await
308 }
309
310 pub async fn fork_choice_updated_v2_metered(
312 &self,
313 state: ForkchoiceState,
314 payload_attrs: Option<EngineT::PayloadAttributes>,
315 ) -> EngineApiResult<ForkchoiceUpdated> {
316 let start = Instant::now();
317 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
318 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
319 res
320 }
321
322 pub async fn fork_choice_updated_v3(
327 &self,
328 state: ForkchoiceState,
329 payload_attrs: Option<EngineT::PayloadAttributes>,
330 ) -> EngineApiResult<ForkchoiceUpdated> {
331 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
332 .await
333 }
334
335 pub async fn fork_choice_updated_v3_metered(
337 &self,
338 state: ForkchoiceState,
339 payload_attrs: Option<EngineT::PayloadAttributes>,
340 ) -> EngineApiResult<ForkchoiceUpdated> {
341 let start = Instant::now();
342 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
343 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
344 res
345 }
346
347 async fn get_built_payload(
349 &self,
350 payload_id: PayloadId,
351 ) -> EngineApiResult<EngineT::BuiltPayload> {
352 self.inner
353 .payload_store
354 .resolve(payload_id)
355 .await
356 .ok_or(EngineApiError::UnknownPayload)?
357 .map_err(|_| EngineApiError::UnknownPayload)
358 }
359
360 async fn get_payload_inner<R>(
363 &self,
364 payload_id: PayloadId,
365 version: EngineApiMessageVersion,
366 ) -> EngineApiResult<R>
367 where
368 EngineT::BuiltPayload: TryInto<R>,
369 {
370 let timestamp = self.get_payload_timestamp(payload_id).await?;
373 validate_payload_timestamp(
374 &self.inner.chain_spec,
375 version,
376 timestamp,
377 MessageValidationKind::GetPayload,
378 )?;
379
380 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
382 warn!(?version, "could not transform built payload");
383 EngineApiError::UnknownPayload
384 })
385 }
386
387 pub async fn get_payload_v1(
397 &self,
398 payload_id: PayloadId,
399 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
400 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
401 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
402 EngineApiError::UnknownPayload
403 })
404 }
405
406 pub async fn get_payload_v1_metered(
408 &self,
409 payload_id: PayloadId,
410 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
411 let start = Instant::now();
412 let res = Self::get_payload_v1(self, payload_id).await;
413 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
414 res
415 }
416
417 pub async fn get_payload_v2(
425 &self,
426 payload_id: PayloadId,
427 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
428 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
429 }
430
431 pub async fn get_payload_v2_metered(
433 &self,
434 payload_id: PayloadId,
435 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
436 let start = Instant::now();
437 let res = Self::get_payload_v2(self, payload_id).await;
438 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
439 res
440 }
441
442 pub async fn get_payload_v3(
450 &self,
451 payload_id: PayloadId,
452 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
453 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
454 }
455
456 pub async fn get_payload_v3_metered(
458 &self,
459 payload_id: PayloadId,
460 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
461 let start = Instant::now();
462 let res = Self::get_payload_v3(self, payload_id).await;
463 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
464 res
465 }
466
467 pub async fn get_payload_v4(
475 &self,
476 payload_id: PayloadId,
477 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
478 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
479 }
480
481 pub async fn get_payload_v4_metered(
483 &self,
484 payload_id: PayloadId,
485 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
486 let start = Instant::now();
487 let res = Self::get_payload_v4(self, payload_id).await;
488 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
489 res
490 }
491
492 pub async fn get_payload_v5(
502 &self,
503 payload_id: PayloadId,
504 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
505 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
506 }
507
508 pub async fn get_payload_v5_metered(
510 &self,
511 payload_id: PayloadId,
512 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
513 let start = Instant::now();
514 let res = Self::get_payload_v5(self, payload_id).await;
515 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
516 res
517 }
518
519 pub async fn get_payload_bodies_by_range_with<F, R>(
522 &self,
523 start: BlockNumber,
524 count: u64,
525 f: F,
526 ) -> EngineApiResult<Vec<Option<R>>>
527 where
528 F: Fn(Provider::Block) -> R + Send + 'static,
529 R: Send + 'static,
530 {
531 let (tx, rx) = oneshot::channel();
532 let inner = self.inner.clone();
533
534 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
535 if count > MAX_PAYLOAD_BODIES_LIMIT {
536 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
537 return;
538 }
539
540 if start == 0 || count == 0 {
541 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
542 return;
543 }
544
545 let mut result = Vec::with_capacity(count as usize);
546
547 let mut end = start.saturating_add(count - 1);
549
550 if let Ok(best_block) = inner.provider.best_block_number()
553 && end > best_block {
554 end = best_block;
555 }
556
557 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
559 for num in start..=end {
560 if num < earliest_block {
561 result.push(None);
562 continue;
563 }
564 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
565 match block_result {
566 Ok(block) => {
567 result.push(block.map(&f));
568 }
569 Err(err) => {
570 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
571 return;
572 }
573 };
574 }
575 tx.send(Ok(result)).ok();
576 }));
577
578 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
579 }
580
581 pub async fn get_payload_bodies_by_range_v1(
592 &self,
593 start: BlockNumber,
594 count: u64,
595 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
596 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
597 transactions: block.body().encoded_2718_transactions(),
598 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
599 })
600 .await
601 }
602
603 pub async fn get_payload_bodies_by_range_v1_metered(
605 &self,
606 start: BlockNumber,
607 count: u64,
608 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
609 let start_time = Instant::now();
610 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
611 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
612 res
613 }
614
615 pub async fn get_payload_bodies_by_hash_with<F, R>(
617 &self,
618 hashes: Vec<BlockHash>,
619 f: F,
620 ) -> EngineApiResult<Vec<Option<R>>>
621 where
622 F: Fn(Provider::Block) -> R + Send + 'static,
623 R: Send + 'static,
624 {
625 let len = hashes.len() as u64;
626 if len > MAX_PAYLOAD_BODIES_LIMIT {
627 return Err(EngineApiError::PayloadRequestTooLarge { len });
628 }
629
630 let (tx, rx) = oneshot::channel();
631 let inner = self.inner.clone();
632
633 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
634 let mut result = Vec::with_capacity(hashes.len());
635 for hash in hashes {
636 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
637 match block_result {
638 Ok(block) => {
639 result.push(block.map(&f));
640 }
641 Err(err) => {
642 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
643 return;
644 }
645 }
646 }
647 tx.send(Ok(result)).ok();
648 }));
649
650 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
651 }
652
653 pub async fn get_payload_bodies_by_hash_v1(
655 &self,
656 hashes: Vec<BlockHash>,
657 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
658 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
659 transactions: block.body().encoded_2718_transactions(),
660 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
661 })
662 .await
663 }
664
665 pub async fn get_payload_bodies_by_hash_v1_metered(
667 &self,
668 hashes: Vec<BlockHash>,
669 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
670 let start = Instant::now();
671 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
672 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
673 res
674 }
675
676 async fn validate_and_execute_forkchoice(
690 &self,
691 version: EngineApiMessageVersion,
692 state: ForkchoiceState,
693 payload_attrs: Option<EngineT::PayloadAttributes>,
694 ) -> EngineApiResult<ForkchoiceUpdated> {
695 if let Some(ref attrs) = payload_attrs {
696 let attr_validation_res =
697 self.inner.validator.ensure_well_formed_attributes(version, attrs);
698
699 if let Err(err) = attr_validation_res {
713 let fcu_res =
714 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
715 if fcu_res.is_invalid() {
718 return Ok(fcu_res)
719 }
720 return Err(err.into())
721 }
722 }
723
724 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
725 }
726
727 pub fn capabilities(&self) -> &EngineCapabilities {
729 &self.inner.capabilities
730 }
731
732 fn get_blobs_v1(
733 &self,
734 versioned_hashes: Vec<B256>,
735 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
736 let current_timestamp =
738 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
739 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
740 return Err(EngineApiError::EngineObjectValidationError(
741 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
742 ));
743 }
744
745 if versioned_hashes.len() > MAX_BLOB_LIMIT {
746 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
747 }
748
749 self.inner
750 .tx_pool
751 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
752 .map_err(|err| EngineApiError::Internal(Box::new(err)))
753 }
754
755 pub fn get_blobs_v1_metered(
757 &self,
758 versioned_hashes: Vec<B256>,
759 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
760 let hashes_len = versioned_hashes.len();
761 let start = Instant::now();
762 let res = Self::get_blobs_v1(self, versioned_hashes);
763 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
764
765 if let Ok(blobs) = &res {
766 let blobs_found = blobs.iter().flatten().count();
767 let blobs_missed = hashes_len - blobs_found;
768
769 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
770 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
771 }
772
773 res
774 }
775
776 fn get_blobs_v2(
777 &self,
778 versioned_hashes: Vec<B256>,
779 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
780 let current_timestamp =
782 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
783 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
784 return Err(EngineApiError::EngineObjectValidationError(
785 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
786 ));
787 }
788
789 if versioned_hashes.len() > MAX_BLOB_LIMIT {
790 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
791 }
792
793 self.inner
794 .tx_pool
795 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
796 .map_err(|err| EngineApiError::Internal(Box::new(err)))
797 }
798
799 fn get_blobs_v3(
800 &self,
801 versioned_hashes: Vec<B256>,
802 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
803 let current_timestamp =
805 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
806 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
807 return Err(EngineApiError::EngineObjectValidationError(
808 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
809 ));
810 }
811
812 if versioned_hashes.len() > MAX_BLOB_LIMIT {
813 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
814 }
815
816 if (*self.inner.is_syncing)() {
818 return Ok(None)
819 }
820
821 self.inner
822 .tx_pool
823 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
824 .map(Some)
825 .map_err(|err| EngineApiError::Internal(Box::new(err)))
826 }
827
828 pub fn get_blobs_v2_metered(
830 &self,
831 versioned_hashes: Vec<B256>,
832 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
833 let hashes_len = versioned_hashes.len();
834 let start = Instant::now();
835 let res = Self::get_blobs_v2(self, versioned_hashes);
836 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
837
838 if let Ok(blobs) = &res {
839 let blobs_found = blobs.iter().flatten().count();
840
841 self.inner
842 .metrics
843 .blob_metrics
844 .get_blobs_requests_blobs_total
845 .increment(hashes_len as u64);
846 self.inner
847 .metrics
848 .blob_metrics
849 .get_blobs_requests_blobs_in_blobpool_total
850 .increment(blobs_found as u64);
851
852 if blobs_found == hashes_len {
853 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
854 } else {
855 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
856 }
857 } else {
858 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
859 }
860
861 res
862 }
863
864 pub fn get_blobs_v3_metered(
866 &self,
867 versioned_hashes: Vec<B256>,
868 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
869 let hashes_len = versioned_hashes.len();
870 let start = Instant::now();
871 let res = Self::get_blobs_v3(self, versioned_hashes);
872 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
873
874 if let Ok(Some(blobs)) = &res {
875 let blobs_found = blobs.iter().flatten().count();
876 let blobs_missed = hashes_len - blobs_found;
877
878 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
879 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
880 }
881
882 res
883 }
884}
885
886#[async_trait]
888impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
889 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
890where
891 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
892 EngineT: EngineTypes<ExecutionData = ExecutionData>,
893 Pool: TransactionPool + 'static,
894 Validator: EngineApiValidator<EngineT>,
895 ChainSpec: EthereumHardforks + Send + Sync + 'static,
896{
897 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
901 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
902 let payload =
903 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
904 Ok(self.new_payload_v1_metered(payload).await?)
905 }
906
907 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
910 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
911 let payload = ExecutionData {
912 payload: payload.into_payload(),
913 sidecar: ExecutionPayloadSidecar::none(),
914 };
915
916 Ok(self.new_payload_v2_metered(payload).await?)
917 }
918
919 async fn new_payload_v3(
922 &self,
923 payload: ExecutionPayloadV3,
924 versioned_hashes: Vec<B256>,
925 parent_beacon_block_root: B256,
926 ) -> RpcResult<PayloadStatus> {
927 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
928 let payload = ExecutionData {
929 payload: payload.into(),
930 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
931 versioned_hashes,
932 parent_beacon_block_root,
933 }),
934 };
935
936 Ok(self.new_payload_v3_metered(payload).await?)
937 }
938
939 async fn new_payload_v4(
942 &self,
943 payload: ExecutionPayloadV3,
944 versioned_hashes: Vec<B256>,
945 parent_beacon_block_root: B256,
946 requests: RequestsOrHash,
947 ) -> RpcResult<PayloadStatus> {
948 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
949
950 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
952 return Err(EngineApiError::UnexpectedRequestsHash.into());
953 }
954
955 let payload = ExecutionData {
956 payload: payload.into(),
957 sidecar: ExecutionPayloadSidecar::v4(
958 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
959 PraguePayloadFields { requests },
960 ),
961 };
962
963 Ok(self.new_payload_v4_metered(payload).await?)
964 }
965
966 async fn fork_choice_updated_v1(
971 &self,
972 fork_choice_state: ForkchoiceState,
973 payload_attributes: Option<EngineT::PayloadAttributes>,
974 ) -> RpcResult<ForkchoiceUpdated> {
975 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
976 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
977 }
978
979 async fn fork_choice_updated_v2(
982 &self,
983 fork_choice_state: ForkchoiceState,
984 payload_attributes: Option<EngineT::PayloadAttributes>,
985 ) -> RpcResult<ForkchoiceUpdated> {
986 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
987 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
988 }
989
990 async fn fork_choice_updated_v3(
994 &self,
995 fork_choice_state: ForkchoiceState,
996 payload_attributes: Option<EngineT::PayloadAttributes>,
997 ) -> RpcResult<ForkchoiceUpdated> {
998 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
999 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1000 }
1001
1002 async fn get_payload_v1(
1014 &self,
1015 payload_id: PayloadId,
1016 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1017 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1018 Ok(self.get_payload_v1_metered(payload_id).await?)
1019 }
1020
1021 async fn get_payload_v2(
1031 &self,
1032 payload_id: PayloadId,
1033 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1034 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1035 Ok(self.get_payload_v2_metered(payload_id).await?)
1036 }
1037
1038 async fn get_payload_v3(
1048 &self,
1049 payload_id: PayloadId,
1050 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1051 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1052 Ok(self.get_payload_v3_metered(payload_id).await?)
1053 }
1054
1055 async fn get_payload_v4(
1065 &self,
1066 payload_id: PayloadId,
1067 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1068 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1069 Ok(self.get_payload_v4_metered(payload_id).await?)
1070 }
1071
1072 async fn get_payload_v5(
1082 &self,
1083 payload_id: PayloadId,
1084 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1085 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1086 Ok(self.get_payload_v5_metered(payload_id).await?)
1087 }
1088
1089 async fn get_payload_bodies_by_hash_v1(
1092 &self,
1093 block_hashes: Vec<BlockHash>,
1094 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1095 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1096 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1097 }
1098
1099 async fn get_payload_bodies_by_range_v1(
1116 &self,
1117 start: U64,
1118 count: U64,
1119 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1120 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1121 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1122 }
1123
1124 async fn get_client_version_v1(
1128 &self,
1129 client: ClientVersionV1,
1130 ) -> RpcResult<Vec<ClientVersionV1>> {
1131 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1132 Ok(Self::get_client_version_v1(self, client)?)
1133 }
1134
1135 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1138 Ok(self.capabilities().list())
1139 }
1140
1141 async fn get_blobs_v1(
1142 &self,
1143 versioned_hashes: Vec<B256>,
1144 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1145 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1146 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1147 }
1148
1149 async fn get_blobs_v2(
1150 &self,
1151 versioned_hashes: Vec<B256>,
1152 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1153 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1154 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1155 }
1156
1157 async fn get_blobs_v3(
1158 &self,
1159 versioned_hashes: Vec<B256>,
1160 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1161 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1162 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1163 }
1164}
1165
1166impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1167 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1168where
1169 EngineT: EngineTypes,
1170 Self: EngineApiServer<EngineT>,
1171{
1172 fn into_rpc_module(self) -> RpcModule<()> {
1173 self.into_rpc().remove_context()
1174 }
1175}
1176
1177impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1178 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1179where
1180 PayloadT: PayloadTypes,
1181{
1182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1183 f.debug_struct("EngineApi").finish_non_exhaustive()
1184 }
1185}
1186
1187impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1188 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1189where
1190 PayloadT: PayloadTypes,
1191{
1192 fn clone(&self) -> Self {
1193 Self { inner: Arc::clone(&self.inner) }
1194 }
1195}
1196
1197struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1199 provider: Provider,
1201 chain_spec: Arc<ChainSpec>,
1203 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1205 payload_store: PayloadStore<PayloadT>,
1207 task_spawner: Box<dyn TaskSpawner>,
1209 metrics: EngineApiMetrics,
1211 client: ClientVersionV1,
1213 capabilities: EngineCapabilities,
1215 tx_pool: Pool,
1217 validator: Validator,
1219 accept_execution_requests_hash: bool,
1220 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1222}
1223
1224#[cfg(test)]
1225mod tests {
1226 use super::*;
1227 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1228 use assert_matches::assert_matches;
1229 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1230 use reth_engine_primitives::BeaconEngineMessage;
1231 use reth_ethereum_engine_primitives::EthEngineTypes;
1232 use reth_ethereum_primitives::Block;
1233 use reth_network_api::{
1234 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1235 };
1236 use reth_node_ethereum::EthereumEngineValidator;
1237 use reth_payload_builder::test_utils::spawn_test_payload_service;
1238 use reth_provider::test_utils::MockEthProvider;
1239 use reth_tasks::TokioTaskExecutor;
1240 use reth_transaction_pool::noop::NoopTransactionPool;
1241 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1242
1243 fn setup_engine_api() -> (
1244 EngineApiTestHandle,
1245 EngineApi<
1246 Arc<MockEthProvider>,
1247 EthEngineTypes,
1248 NoopTransactionPool,
1249 EthereumEngineValidator,
1250 ChainSpec,
1251 >,
1252 ) {
1253 let client = ClientVersionV1 {
1254 code: ClientCode::RH,
1255 name: "Reth".to_string(),
1256 version: "v0.2.0-beta.5".to_string(),
1257 commit: "defa64b2".to_string(),
1258 };
1259
1260 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1261 let provider = Arc::new(MockEthProvider::default());
1262 let payload_store = spawn_test_payload_service();
1263 let (to_engine, engine_rx) = unbounded_channel();
1264 let task_executor = Box::<TokioTaskExecutor>::default();
1265 let api = EngineApi::new(
1266 provider.clone(),
1267 chain_spec.clone(),
1268 ConsensusEngineHandle::new(to_engine),
1269 payload_store.into(),
1270 NoopTransactionPool::default(),
1271 task_executor,
1272 client,
1273 EngineCapabilities::default(),
1274 EthereumEngineValidator::new(chain_spec.clone()),
1275 false,
1276 NoopNetwork::default(),
1277 );
1278 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1279 (handle, api)
1280 }
1281
1282 #[tokio::test]
1283 async fn engine_client_version_v1() {
1284 let client = ClientVersionV1 {
1285 code: ClientCode::RH,
1286 name: "Reth".to_string(),
1287 version: "v0.2.0-beta.5".to_string(),
1288 commit: "defa64b2".to_string(),
1289 };
1290 let (_, api) = setup_engine_api();
1291 let res = api.get_client_version_v1(client.clone());
1292 assert_eq!(res.unwrap(), vec![client]);
1293 }
1294
1295 struct EngineApiTestHandle {
1296 #[allow(dead_code)]
1297 chain_spec: Arc<ChainSpec>,
1298 provider: Arc<MockEthProvider>,
1299 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1300 }
1301
1302 #[tokio::test]
1303 async fn forwards_responses_to_consensus_engine() {
1304 let (mut handle, api) = setup_engine_api();
1305
1306 tokio::spawn(async move {
1307 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1308 let execution_data = ExecutionData {
1309 payload: payload_v1.into(),
1310 sidecar: ExecutionPayloadSidecar::none(),
1311 };
1312
1313 api.new_payload_v1(execution_data).await.unwrap();
1314 });
1315 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1316 }
1317
1318 #[derive(Clone)]
1319 struct TestNetworkInfo {
1320 syncing: bool,
1321 }
1322
1323 impl NetworkInfo for TestNetworkInfo {
1324 fn local_addr(&self) -> std::net::SocketAddr {
1325 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1326 }
1327
1328 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1329 #[allow(deprecated)]
1330 Ok(NetworkStatus {
1331 client_version: "test".to_string(),
1332 protocol_version: 5,
1333 eth_protocol_info: EthProtocolInfo {
1334 network: 1,
1335 difficulty: None,
1336 genesis: Default::default(),
1337 config: Default::default(),
1338 head: Default::default(),
1339 },
1340 capabilities: vec![],
1341 })
1342 }
1343
1344 fn chain_id(&self) -> u64 {
1345 1
1346 }
1347
1348 fn is_syncing(&self) -> bool {
1349 self.syncing
1350 }
1351
1352 fn is_initially_syncing(&self) -> bool {
1353 self.syncing
1354 }
1355 }
1356
1357 #[tokio::test]
1358 async fn get_blobs_v3_returns_null_when_syncing() {
1359 let chain_spec: Arc<ChainSpec> =
1360 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1361 let provider = Arc::new(MockEthProvider::default());
1362 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1363 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1364
1365 let api = EngineApi::new(
1366 provider,
1367 chain_spec.clone(),
1368 ConsensusEngineHandle::new(to_engine),
1369 payload_store.into(),
1370 NoopTransactionPool::default(),
1371 Box::<TokioTaskExecutor>::default(),
1372 ClientVersionV1 {
1373 code: ClientCode::RH,
1374 name: "Reth".to_string(),
1375 version: "v0.0.0-test".to_string(),
1376 commit: "test".to_string(),
1377 },
1378 EngineCapabilities::default(),
1379 EthereumEngineValidator::new(chain_spec),
1380 false,
1381 TestNetworkInfo { syncing: true },
1382 );
1383
1384 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1385 assert_matches!(res, Ok(None));
1386 }
1387
1388 mod get_payload_bodies {
1390 use super::*;
1391 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1392 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1393
1394 #[tokio::test]
1395 async fn invalid_params() {
1396 let (_, api) = setup_engine_api();
1397
1398 let by_range_tests = [
1399 (0, 0),
1401 (0, 1),
1402 (1, 0),
1403 ];
1404
1405 for (start, count) in by_range_tests {
1407 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1408 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1409 }
1410 }
1411
1412 #[tokio::test]
1413 async fn request_too_large() {
1414 let (_, api) = setup_engine_api();
1415
1416 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1417 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1418 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1419 }
1420
1421 #[tokio::test]
1422 async fn returns_payload_bodies() {
1423 let mut rng = generators::rng();
1424 let (handle, api) = setup_engine_api();
1425
1426 let (start, count) = (1, 10);
1427 let blocks = random_block_range(
1428 &mut rng,
1429 start..=start + count - 1,
1430 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1431 );
1432 handle
1433 .provider
1434 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1435
1436 let expected = blocks
1437 .iter()
1438 .cloned()
1439 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1440 .collect::<Vec<_>>();
1441
1442 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1443 assert_eq!(res, expected);
1444 }
1445
1446 #[tokio::test]
1447 async fn returns_payload_bodies_with_gaps() {
1448 let mut rng = generators::rng();
1449 let (handle, api) = setup_engine_api();
1450
1451 let (start, count) = (1, 100);
1452 let blocks = random_block_range(
1453 &mut rng,
1454 start..=start + count - 1,
1455 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1456 );
1457
1458 let first_missing_range = 26..=50;
1460 let second_missing_range = 76..=100;
1461 handle.provider.extend_blocks(
1462 blocks
1463 .iter()
1464 .filter(|b| {
1465 !first_missing_range.contains(&b.number) &&
1466 !second_missing_range.contains(&b.number)
1467 })
1468 .map(|b| (b.hash(), b.clone().into_block())),
1469 );
1470
1471 let expected = blocks
1472 .iter()
1473 .filter(|b| !second_missing_range.contains(&b.number))
1476 .cloned()
1477 .map(|b| {
1478 if first_missing_range.contains(&b.number) {
1479 None
1480 } else {
1481 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1482 }
1483 })
1484 .collect::<Vec<_>>();
1485
1486 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1487 assert_eq!(res, expected);
1488
1489 let expected = blocks
1490 .iter()
1491 .cloned()
1492 .map(|b| {
1495 if first_missing_range.contains(&b.number) ||
1496 second_missing_range.contains(&b.number)
1497 {
1498 None
1499 } else {
1500 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1501 }
1502 })
1503 .collect::<Vec<_>>();
1504
1505 let hashes = blocks.iter().map(|b| b.hash()).collect();
1506 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1507 assert_eq!(res, expected);
1508 }
1509 }
1510}