1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24 validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload, PayloadOrAttributes,
25 PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{
33 sync::Arc,
34 time::{Instant, SystemTime},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace, warn};
38
39pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
41
42const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
44
45const MAX_BLOB_LIMIT: usize = 128;
47
48pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
64 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
65}
66
67impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
68 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
69{
70 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
72 &self.inner.chain_spec
73 }
74}
75
76impl<Provider, PayloadT, Pool, Validator, ChainSpec>
77 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
78where
79 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
80 PayloadT: PayloadTypes,
81 Pool: TransactionPool + 'static,
82 Validator: EngineApiValidator<PayloadT>,
83 ChainSpec: EthereumHardforks + Send + Sync + 'static,
84{
85 #[expect(clippy::too_many_arguments)]
87 pub fn new(
88 provider: Provider,
89 chain_spec: Arc<ChainSpec>,
90 beacon_consensus: ConsensusEngineHandle<PayloadT>,
91 payload_store: PayloadStore<PayloadT>,
92 tx_pool: Pool,
93 task_spawner: Box<dyn TaskSpawner>,
94 client: ClientVersionV1,
95 capabilities: EngineCapabilities,
96 validator: Validator,
97 accept_execution_requests_hash: bool,
98 ) -> Self {
99 let inner = Arc::new(EngineApiInner {
100 provider,
101 chain_spec,
102 beacon_consensus,
103 payload_store,
104 task_spawner,
105 metrics: EngineApiMetrics::default(),
106 client,
107 capabilities,
108 tx_pool,
109 validator,
110 latest_new_payload_response: Mutex::new(None),
111 accept_execution_requests_hash,
112 });
113 Self { inner }
114 }
115
116 pub fn get_client_version_v1(
118 &self,
119 _client: ClientVersionV1,
120 ) -> EngineApiResult<Vec<ClientVersionV1>> {
121 Ok(vec![self.inner.client.clone()])
122 }
123
124 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
126 Ok(self
127 .inner
128 .payload_store
129 .payload_timestamp(payload_id)
130 .await
131 .ok_or(EngineApiError::UnknownPayload)??)
132 }
133
134 pub async fn new_payload_v1(
137 &self,
138 payload: PayloadT::ExecutionData,
139 ) -> EngineApiResult<PayloadStatus> {
140 let payload_or_attrs = PayloadOrAttributes::<
141 '_,
142 PayloadT::ExecutionData,
143 PayloadT::PayloadAttributes,
144 >::from_execution_payload(&payload);
145
146 self.inner
147 .validator
148 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
149
150 Ok(self
151 .inner
152 .beacon_consensus
153 .new_payload(payload)
154 .await
155 .inspect(|_| self.inner.on_new_payload_response())?)
156 }
157
158 pub async fn new_payload_v1_metered(
160 &self,
161 payload: PayloadT::ExecutionData,
162 ) -> EngineApiResult<PayloadStatus> {
163 let start = Instant::now();
164 let gas_used = payload.gas_used();
165
166 let res = Self::new_payload_v1(self, payload).await;
167 let elapsed = start.elapsed();
168 self.inner.metrics.latency.new_payload_v1.record(elapsed);
169 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
170 res
171 }
172
173 pub async fn new_payload_v2(
175 &self,
176 payload: PayloadT::ExecutionData,
177 ) -> EngineApiResult<PayloadStatus> {
178 let payload_or_attrs = PayloadOrAttributes::<
179 '_,
180 PayloadT::ExecutionData,
181 PayloadT::PayloadAttributes,
182 >::from_execution_payload(&payload);
183 self.inner
184 .validator
185 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
186 Ok(self
187 .inner
188 .beacon_consensus
189 .new_payload(payload)
190 .await
191 .inspect(|_| self.inner.on_new_payload_response())?)
192 }
193
194 pub async fn new_payload_v2_metered(
196 &self,
197 payload: PayloadT::ExecutionData,
198 ) -> EngineApiResult<PayloadStatus> {
199 let start = Instant::now();
200 let gas_used = payload.gas_used();
201
202 let res = Self::new_payload_v2(self, payload).await;
203 let elapsed = start.elapsed();
204 self.inner.metrics.latency.new_payload_v2.record(elapsed);
205 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
206 res
207 }
208
209 pub async fn new_payload_v3(
211 &self,
212 payload: PayloadT::ExecutionData,
213 ) -> EngineApiResult<PayloadStatus> {
214 let payload_or_attrs = PayloadOrAttributes::<
215 '_,
216 PayloadT::ExecutionData,
217 PayloadT::PayloadAttributes,
218 >::from_execution_payload(&payload);
219 self.inner
220 .validator
221 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
222
223 Ok(self
224 .inner
225 .beacon_consensus
226 .new_payload(payload)
227 .await
228 .inspect(|_| self.inner.on_new_payload_response())?)
229 }
230
231 pub async fn new_payload_v3_metered(
233 &self,
234 payload: PayloadT::ExecutionData,
235 ) -> RpcResult<PayloadStatus> {
236 let start = Instant::now();
237 let gas_used = payload.gas_used();
238
239 let res = Self::new_payload_v3(self, payload).await;
240 let elapsed = start.elapsed();
241 self.inner.metrics.latency.new_payload_v3.record(elapsed);
242 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
243 Ok(res?)
244 }
245
246 pub async fn new_payload_v4(
248 &self,
249 payload: PayloadT::ExecutionData,
250 ) -> EngineApiResult<PayloadStatus> {
251 let payload_or_attrs = PayloadOrAttributes::<
252 '_,
253 PayloadT::ExecutionData,
254 PayloadT::PayloadAttributes,
255 >::from_execution_payload(&payload);
256 self.inner
257 .validator
258 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
259
260 Ok(self
261 .inner
262 .beacon_consensus
263 .new_payload(payload)
264 .await
265 .inspect(|_| self.inner.on_new_payload_response())?)
266 }
267
268 pub async fn new_payload_v4_metered(
270 &self,
271 payload: PayloadT::ExecutionData,
272 ) -> RpcResult<PayloadStatus> {
273 let start = Instant::now();
274 let gas_used = payload.gas_used();
275
276 let res = Self::new_payload_v4(self, payload).await;
277
278 let elapsed = start.elapsed();
279 self.inner.metrics.latency.new_payload_v4.record(elapsed);
280 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
281 Ok(res?)
282 }
283
284 pub fn accept_execution_requests_hash(&self) -> bool {
286 self.inner.accept_execution_requests_hash
287 }
288}
289
290impl<Provider, EngineT, Pool, Validator, ChainSpec>
291 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
292where
293 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
294 EngineT: EngineTypes,
295 Pool: TransactionPool + 'static,
296 Validator: EngineApiValidator<EngineT>,
297 ChainSpec: EthereumHardforks + Send + Sync + 'static,
298{
299 pub async fn fork_choice_updated_v1(
306 &self,
307 state: ForkchoiceState,
308 payload_attrs: Option<EngineT::PayloadAttributes>,
309 ) -> EngineApiResult<ForkchoiceUpdated> {
310 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
311 .await
312 }
313
314 pub async fn fork_choice_updated_v1_metered(
316 &self,
317 state: ForkchoiceState,
318 payload_attrs: Option<EngineT::PayloadAttributes>,
319 ) -> EngineApiResult<ForkchoiceUpdated> {
320 let start = Instant::now();
321 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
322 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
323 self.inner.metrics.fcu_response.update_response_metrics(&res);
324 res
325 }
326
327 pub async fn fork_choice_updated_v2(
332 &self,
333 state: ForkchoiceState,
334 payload_attrs: Option<EngineT::PayloadAttributes>,
335 ) -> EngineApiResult<ForkchoiceUpdated> {
336 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
337 .await
338 }
339
340 pub async fn fork_choice_updated_v2_metered(
342 &self,
343 state: ForkchoiceState,
344 payload_attrs: Option<EngineT::PayloadAttributes>,
345 ) -> EngineApiResult<ForkchoiceUpdated> {
346 let start = Instant::now();
347 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
348 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
349 self.inner.metrics.fcu_response.update_response_metrics(&res);
350 res
351 }
352
353 pub async fn fork_choice_updated_v3(
358 &self,
359 state: ForkchoiceState,
360 payload_attrs: Option<EngineT::PayloadAttributes>,
361 ) -> EngineApiResult<ForkchoiceUpdated> {
362 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
363 .await
364 }
365
366 pub async fn fork_choice_updated_v3_metered(
368 &self,
369 state: ForkchoiceState,
370 payload_attrs: Option<EngineT::PayloadAttributes>,
371 ) -> EngineApiResult<ForkchoiceUpdated> {
372 let start = Instant::now();
373 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
374 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
375 self.inner.metrics.fcu_response.update_response_metrics(&res);
376 res
377 }
378
379 async fn get_built_payload(
381 &self,
382 payload_id: PayloadId,
383 ) -> EngineApiResult<EngineT::BuiltPayload> {
384 self.inner
385 .payload_store
386 .resolve(payload_id)
387 .await
388 .ok_or(EngineApiError::UnknownPayload)?
389 .map_err(|_| EngineApiError::UnknownPayload)
390 }
391
392 async fn get_payload_inner<R>(
395 &self,
396 payload_id: PayloadId,
397 version: EngineApiMessageVersion,
398 ) -> EngineApiResult<R>
399 where
400 EngineT::BuiltPayload: TryInto<R>,
401 {
402 let timestamp = self.get_payload_timestamp(payload_id).await?;
404 validate_payload_timestamp(&self.inner.chain_spec, version, timestamp)?;
405
406 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
408 warn!(?version, "could not transform built payload");
409 EngineApiError::UnknownPayload
410 })
411 }
412
413 pub async fn get_payload_v1(
423 &self,
424 payload_id: PayloadId,
425 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
426 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
427 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
428 EngineApiError::UnknownPayload
429 })
430 }
431
432 pub async fn get_payload_v1_metered(
434 &self,
435 payload_id: PayloadId,
436 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
437 let start = Instant::now();
438 let res = Self::get_payload_v1(self, payload_id).await;
439 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
440 res
441 }
442
443 pub async fn get_payload_v2(
451 &self,
452 payload_id: PayloadId,
453 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
454 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
455 }
456
457 pub async fn get_payload_v2_metered(
459 &self,
460 payload_id: PayloadId,
461 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
462 let start = Instant::now();
463 let res = Self::get_payload_v2(self, payload_id).await;
464 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
465 res
466 }
467
468 pub async fn get_payload_v3(
476 &self,
477 payload_id: PayloadId,
478 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
479 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
480 }
481
482 pub async fn get_payload_v3_metered(
484 &self,
485 payload_id: PayloadId,
486 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
487 let start = Instant::now();
488 let res = Self::get_payload_v3(self, payload_id).await;
489 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
490 res
491 }
492
493 pub async fn get_payload_v4(
501 &self,
502 payload_id: PayloadId,
503 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
504 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
505 }
506
507 pub async fn get_payload_v4_metered(
509 &self,
510 payload_id: PayloadId,
511 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
512 let start = Instant::now();
513 let res = Self::get_payload_v4(self, payload_id).await;
514 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
515 res
516 }
517
518 pub async fn get_payload_v5(
528 &self,
529 payload_id: PayloadId,
530 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
531 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
532 }
533
534 pub async fn get_payload_v5_metered(
536 &self,
537 payload_id: PayloadId,
538 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
539 let start = Instant::now();
540 let res = Self::get_payload_v5(self, payload_id).await;
541 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
542 res
543 }
544
545 pub async fn get_payload_bodies_by_range_with<F, R>(
548 &self,
549 start: BlockNumber,
550 count: u64,
551 f: F,
552 ) -> EngineApiResult<Vec<Option<R>>>
553 where
554 F: Fn(Provider::Block) -> R + Send + 'static,
555 R: Send + 'static,
556 {
557 let (tx, rx) = oneshot::channel();
558 let inner = self.inner.clone();
559
560 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
561 if count > MAX_PAYLOAD_BODIES_LIMIT {
562 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
563 return;
564 }
565
566 if start == 0 || count == 0 {
567 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
568 return;
569 }
570
571 let mut result = Vec::with_capacity(count as usize);
572
573 let mut end = start.saturating_add(count - 1);
575
576 if let Ok(best_block) = inner.provider.best_block_number()
579 && end > best_block {
580 end = best_block;
581 }
582
583 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
585 for num in start..=end {
586 if num < earliest_block {
587 result.push(None);
588 continue;
589 }
590 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
591 match block_result {
592 Ok(block) => {
593 result.push(block.map(&f));
594 }
595 Err(err) => {
596 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
597 return;
598 }
599 };
600 }
601 tx.send(Ok(result)).ok();
602 }));
603
604 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
605 }
606
607 pub async fn get_payload_bodies_by_range_v1(
618 &self,
619 start: BlockNumber,
620 count: u64,
621 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
622 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
623 transactions: block.body().encoded_2718_transactions(),
624 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
625 })
626 .await
627 }
628
629 pub async fn get_payload_bodies_by_range_v1_metered(
631 &self,
632 start: BlockNumber,
633 count: u64,
634 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
635 let start_time = Instant::now();
636 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
637 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
638 res
639 }
640
641 pub async fn get_payload_bodies_by_hash_with<F, R>(
643 &self,
644 hashes: Vec<BlockHash>,
645 f: F,
646 ) -> EngineApiResult<Vec<Option<R>>>
647 where
648 F: Fn(Provider::Block) -> R + Send + 'static,
649 R: Send + 'static,
650 {
651 let len = hashes.len() as u64;
652 if len > MAX_PAYLOAD_BODIES_LIMIT {
653 return Err(EngineApiError::PayloadRequestTooLarge { len });
654 }
655
656 let (tx, rx) = oneshot::channel();
657 let inner = self.inner.clone();
658
659 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
660 let mut result = Vec::with_capacity(hashes.len());
661 for hash in hashes {
662 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
663 match block_result {
664 Ok(block) => {
665 result.push(block.map(&f));
666 }
667 Err(err) => {
668 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
669 return;
670 }
671 }
672 }
673 tx.send(Ok(result)).ok();
674 }));
675
676 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
677 }
678
679 pub async fn get_payload_bodies_by_hash_v1(
681 &self,
682 hashes: Vec<BlockHash>,
683 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
684 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
685 transactions: block.body().encoded_2718_transactions(),
686 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
687 })
688 .await
689 }
690
691 pub async fn get_payload_bodies_by_hash_v1_metered(
693 &self,
694 hashes: Vec<BlockHash>,
695 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
696 let start = Instant::now();
697 let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
698 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
699 res.await
700 }
701
702 async fn validate_and_execute_forkchoice(
716 &self,
717 version: EngineApiMessageVersion,
718 state: ForkchoiceState,
719 payload_attrs: Option<EngineT::PayloadAttributes>,
720 ) -> EngineApiResult<ForkchoiceUpdated> {
721 self.inner.record_elapsed_time_on_fcu();
722
723 if let Some(ref attrs) = payload_attrs {
724 let attr_validation_res =
725 self.inner.validator.ensure_well_formed_attributes(version, attrs);
726
727 if let Err(err) = attr_validation_res {
741 let fcu_res =
742 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
743 if fcu_res.is_invalid() {
746 return Ok(fcu_res)
747 }
748 return Err(err.into())
749 }
750 }
751
752 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
753 }
754
755 pub fn capabilities(&self) -> &EngineCapabilities {
757 &self.inner.capabilities
758 }
759
760 fn get_blobs_v1(
761 &self,
762 versioned_hashes: Vec<B256>,
763 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
764 let current_timestamp =
766 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
767 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
768 return Err(EngineApiError::EngineObjectValidationError(
769 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
770 ));
771 }
772
773 if versioned_hashes.len() > MAX_BLOB_LIMIT {
774 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
775 }
776
777 self.inner
778 .tx_pool
779 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
780 .map_err(|err| EngineApiError::Internal(Box::new(err)))
781 }
782
783 pub fn get_blobs_v1_metered(
785 &self,
786 versioned_hashes: Vec<B256>,
787 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
788 let hashes_len = versioned_hashes.len();
789 let start = Instant::now();
790 let res = Self::get_blobs_v1(self, versioned_hashes);
791 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
792
793 if let Ok(blobs) = &res {
794 let blobs_found = blobs.iter().flatten().count();
795 let blobs_missed = hashes_len - blobs_found;
796
797 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
798 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
799 }
800
801 res
802 }
803
804 fn get_blobs_v2(
805 &self,
806 versioned_hashes: Vec<B256>,
807 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
808 let current_timestamp =
810 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
811 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
812 return Err(EngineApiError::EngineObjectValidationError(
813 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
814 ));
815 }
816
817 if versioned_hashes.len() > MAX_BLOB_LIMIT {
818 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
819 }
820
821 self.inner
822 .tx_pool
823 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
824 .map_err(|err| EngineApiError::Internal(Box::new(err)))
825 }
826
827 pub fn get_blobs_v2_metered(
829 &self,
830 versioned_hashes: Vec<B256>,
831 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
832 let hashes_len = versioned_hashes.len();
833 let start = Instant::now();
834 let res = Self::get_blobs_v2(self, versioned_hashes);
835 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
836
837 if let Ok(blobs) = &res {
838 let blobs_found = blobs.iter().flatten().count();
839
840 self.inner
841 .metrics
842 .blob_metrics
843 .get_blobs_requests_blobs_total
844 .increment(hashes_len as u64);
845 self.inner
846 .metrics
847 .blob_metrics
848 .get_blobs_requests_blobs_in_blobpool_total
849 .increment(blobs_found as u64);
850
851 if blobs_found == hashes_len {
852 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
853 } else {
854 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
855 }
856 } else {
857 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
858 }
859
860 res
861 }
862}
863
864#[async_trait]
866impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
867 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
868where
869 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
870 EngineT: EngineTypes<ExecutionData = ExecutionData>,
871 Pool: TransactionPool + 'static,
872 Validator: EngineApiValidator<EngineT>,
873 ChainSpec: EthereumHardforks + Send + Sync + 'static,
874{
875 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
879 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
880 let payload =
881 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
882 Ok(self.new_payload_v1_metered(payload).await?)
883 }
884
885 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
888 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
889 let payload = ExecutionData {
890 payload: payload.into_payload(),
891 sidecar: ExecutionPayloadSidecar::none(),
892 };
893
894 Ok(self.new_payload_v2_metered(payload).await?)
895 }
896
897 async fn new_payload_v3(
900 &self,
901 payload: ExecutionPayloadV3,
902 versioned_hashes: Vec<B256>,
903 parent_beacon_block_root: B256,
904 ) -> RpcResult<PayloadStatus> {
905 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
906 let payload = ExecutionData {
907 payload: payload.into(),
908 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
909 versioned_hashes,
910 parent_beacon_block_root,
911 }),
912 };
913
914 Ok(self.new_payload_v3_metered(payload).await?)
915 }
916
917 async fn new_payload_v4(
920 &self,
921 payload: ExecutionPayloadV3,
922 versioned_hashes: Vec<B256>,
923 parent_beacon_block_root: B256,
924 requests: RequestsOrHash,
925 ) -> RpcResult<PayloadStatus> {
926 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
927
928 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
930 return Err(EngineApiError::UnexpectedRequestsHash.into());
931 }
932
933 let payload = ExecutionData {
934 payload: payload.into(),
935 sidecar: ExecutionPayloadSidecar::v4(
936 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
937 PraguePayloadFields { requests },
938 ),
939 };
940
941 Ok(self.new_payload_v4_metered(payload).await?)
942 }
943
944 async fn fork_choice_updated_v1(
949 &self,
950 fork_choice_state: ForkchoiceState,
951 payload_attributes: Option<EngineT::PayloadAttributes>,
952 ) -> RpcResult<ForkchoiceUpdated> {
953 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
954 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
955 }
956
957 async fn fork_choice_updated_v2(
960 &self,
961 fork_choice_state: ForkchoiceState,
962 payload_attributes: Option<EngineT::PayloadAttributes>,
963 ) -> RpcResult<ForkchoiceUpdated> {
964 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
965 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
966 }
967
968 async fn fork_choice_updated_v3(
972 &self,
973 fork_choice_state: ForkchoiceState,
974 payload_attributes: Option<EngineT::PayloadAttributes>,
975 ) -> RpcResult<ForkchoiceUpdated> {
976 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
977 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
978 }
979
980 async fn get_payload_v1(
992 &self,
993 payload_id: PayloadId,
994 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
995 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
996 Ok(self.get_payload_v1_metered(payload_id).await?)
997 }
998
999 async fn get_payload_v2(
1009 &self,
1010 payload_id: PayloadId,
1011 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1012 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1013 Ok(self.get_payload_v2_metered(payload_id).await?)
1014 }
1015
1016 async fn get_payload_v3(
1026 &self,
1027 payload_id: PayloadId,
1028 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1029 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1030 Ok(self.get_payload_v3_metered(payload_id).await?)
1031 }
1032
1033 async fn get_payload_v4(
1043 &self,
1044 payload_id: PayloadId,
1045 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1046 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1047 Ok(self.get_payload_v4_metered(payload_id).await?)
1048 }
1049
1050 async fn get_payload_v5(
1060 &self,
1061 payload_id: PayloadId,
1062 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1063 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1064 Ok(self.get_payload_v5_metered(payload_id).await?)
1065 }
1066
1067 async fn get_payload_bodies_by_hash_v1(
1070 &self,
1071 block_hashes: Vec<BlockHash>,
1072 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1073 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1074 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1075 }
1076
1077 async fn get_payload_bodies_by_range_v1(
1094 &self,
1095 start: U64,
1096 count: U64,
1097 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1098 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1099 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1100 }
1101
1102 async fn get_client_version_v1(
1106 &self,
1107 client: ClientVersionV1,
1108 ) -> RpcResult<Vec<ClientVersionV1>> {
1109 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1110 Ok(Self::get_client_version_v1(self, client)?)
1111 }
1112
1113 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1116 Ok(self.capabilities().list())
1117 }
1118
1119 async fn get_blobs_v1(
1120 &self,
1121 versioned_hashes: Vec<B256>,
1122 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1123 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1124 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1125 }
1126
1127 async fn get_blobs_v2(
1128 &self,
1129 versioned_hashes: Vec<B256>,
1130 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1131 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1132 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1133 }
1134}
1135
1136impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1137 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1138where
1139 EngineT: EngineTypes,
1140 Self: EngineApiServer<EngineT>,
1141{
1142 fn into_rpc_module(self) -> RpcModule<()> {
1143 self.into_rpc().remove_context()
1144 }
1145}
1146
1147impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1148 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1149where
1150 PayloadT: PayloadTypes,
1151{
1152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1153 f.debug_struct("EngineApi").finish_non_exhaustive()
1154 }
1155}
1156
1157impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1158 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1159where
1160 PayloadT: PayloadTypes,
1161{
1162 fn clone(&self) -> Self {
1163 Self { inner: Arc::clone(&self.inner) }
1164 }
1165}
1166
1167struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1169 provider: Provider,
1171 chain_spec: Arc<ChainSpec>,
1173 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1175 payload_store: PayloadStore<PayloadT>,
1177 task_spawner: Box<dyn TaskSpawner>,
1179 metrics: EngineApiMetrics,
1181 client: ClientVersionV1,
1183 capabilities: EngineCapabilities,
1185 tx_pool: Pool,
1187 validator: Validator,
1189 latest_new_payload_response: Mutex<Option<Instant>>,
1191 accept_execution_requests_hash: bool,
1192}
1193
1194impl<Provider, PayloadT, Pool, Validator, ChainSpec>
1195 EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
1196where
1197 PayloadT: PayloadTypes,
1198{
1199 fn record_elapsed_time_on_fcu(&self) {
1202 if let Some(start_time) = self.latest_new_payload_response.lock().take() {
1203 let elapsed_time = start_time.elapsed();
1204 self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
1205 }
1206 }
1207
1208 fn on_new_payload_response(&self) {
1210 self.latest_new_payload_response.lock().replace(Instant::now());
1211 }
1212}
1213
1214#[cfg(test)]
1215mod tests {
1216 use super::*;
1217 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1218 use assert_matches::assert_matches;
1219 use reth_chainspec::{ChainSpec, MAINNET};
1220 use reth_engine_primitives::BeaconEngineMessage;
1221 use reth_ethereum_engine_primitives::EthEngineTypes;
1222 use reth_ethereum_primitives::Block;
1223 use reth_node_ethereum::EthereumEngineValidator;
1224 use reth_payload_builder::test_utils::spawn_test_payload_service;
1225 use reth_provider::test_utils::MockEthProvider;
1226 use reth_tasks::TokioTaskExecutor;
1227 use reth_transaction_pool::noop::NoopTransactionPool;
1228 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1229
1230 fn setup_engine_api() -> (
1231 EngineApiTestHandle,
1232 EngineApi<
1233 Arc<MockEthProvider>,
1234 EthEngineTypes,
1235 NoopTransactionPool,
1236 EthereumEngineValidator,
1237 ChainSpec,
1238 >,
1239 ) {
1240 let client = ClientVersionV1 {
1241 code: ClientCode::RH,
1242 name: "Reth".to_string(),
1243 version: "v0.2.0-beta.5".to_string(),
1244 commit: "defa64b2".to_string(),
1245 };
1246
1247 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1248 let provider = Arc::new(MockEthProvider::default());
1249 let payload_store = spawn_test_payload_service();
1250 let (to_engine, engine_rx) = unbounded_channel();
1251 let task_executor = Box::<TokioTaskExecutor>::default();
1252 let api = EngineApi::new(
1253 provider.clone(),
1254 chain_spec.clone(),
1255 ConsensusEngineHandle::new(to_engine),
1256 payload_store.into(),
1257 NoopTransactionPool::default(),
1258 task_executor,
1259 client,
1260 EngineCapabilities::default(),
1261 EthereumEngineValidator::new(chain_spec.clone()),
1262 false,
1263 );
1264 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1265 (handle, api)
1266 }
1267
1268 #[tokio::test]
1269 async fn engine_client_version_v1() {
1270 let client = ClientVersionV1 {
1271 code: ClientCode::RH,
1272 name: "Reth".to_string(),
1273 version: "v0.2.0-beta.5".to_string(),
1274 commit: "defa64b2".to_string(),
1275 };
1276 let (_, api) = setup_engine_api();
1277 let res = api.get_client_version_v1(client.clone());
1278 assert_eq!(res.unwrap(), vec![client]);
1279 }
1280
1281 struct EngineApiTestHandle {
1282 #[allow(dead_code)]
1283 chain_spec: Arc<ChainSpec>,
1284 provider: Arc<MockEthProvider>,
1285 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1286 }
1287
1288 #[tokio::test]
1289 async fn forwards_responses_to_consensus_engine() {
1290 let (mut handle, api) = setup_engine_api();
1291
1292 tokio::spawn(async move {
1293 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1294 let execution_data = ExecutionData {
1295 payload: payload_v1.into(),
1296 sidecar: ExecutionPayloadSidecar::none(),
1297 };
1298
1299 api.new_payload_v1(execution_data).await.unwrap();
1300 });
1301 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1302 }
1303
1304 mod get_payload_bodies {
1306 use super::*;
1307 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1308 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1309
1310 #[tokio::test]
1311 async fn invalid_params() {
1312 let (_, api) = setup_engine_api();
1313
1314 let by_range_tests = [
1315 (0, 0),
1317 (0, 1),
1318 (1, 0),
1319 ];
1320
1321 for (start, count) in by_range_tests {
1323 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1324 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1325 }
1326 }
1327
1328 #[tokio::test]
1329 async fn request_too_large() {
1330 let (_, api) = setup_engine_api();
1331
1332 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1333 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1334 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1335 }
1336
1337 #[tokio::test]
1338 async fn returns_payload_bodies() {
1339 let mut rng = generators::rng();
1340 let (handle, api) = setup_engine_api();
1341
1342 let (start, count) = (1, 10);
1343 let blocks = random_block_range(
1344 &mut rng,
1345 start..=start + count - 1,
1346 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1347 );
1348 handle
1349 .provider
1350 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1351
1352 let expected = blocks
1353 .iter()
1354 .cloned()
1355 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1356 .collect::<Vec<_>>();
1357
1358 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1359 assert_eq!(res, expected);
1360 }
1361
1362 #[tokio::test]
1363 async fn returns_payload_bodies_with_gaps() {
1364 let mut rng = generators::rng();
1365 let (handle, api) = setup_engine_api();
1366
1367 let (start, count) = (1, 100);
1368 let blocks = random_block_range(
1369 &mut rng,
1370 start..=start + count - 1,
1371 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1372 );
1373
1374 let first_missing_range = 26..=50;
1376 let second_missing_range = 76..=100;
1377 handle.provider.extend_blocks(
1378 blocks
1379 .iter()
1380 .filter(|b| {
1381 !first_missing_range.contains(&b.number) &&
1382 !second_missing_range.contains(&b.number)
1383 })
1384 .map(|b| (b.hash(), b.clone().into_block())),
1385 );
1386
1387 let expected = blocks
1388 .iter()
1389 .filter(|b| !second_missing_range.contains(&b.number))
1392 .cloned()
1393 .map(|b| {
1394 if first_missing_range.contains(&b.number) {
1395 None
1396 } else {
1397 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1398 }
1399 })
1400 .collect::<Vec<_>>();
1401
1402 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1403 assert_eq!(res, expected);
1404
1405 let expected = blocks
1406 .iter()
1407 .cloned()
1408 .map(|b| {
1411 if first_missing_range.contains(&b.number) ||
1412 second_missing_range.contains(&b.number)
1413 {
1414 None
1415 } else {
1416 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1417 }
1418 })
1419 .collect::<Vec<_>>();
1420
1421 let hashes = blocks.iter().map(|b| b.hash()).collect();
1422 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1423 assert_eq!(res, expected);
1424 }
1425 }
1426}