1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::NetworkInfo;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
81 PayloadT: PayloadTypes,
82 Pool: TransactionPool + 'static,
83 Validator: EngineApiValidator<PayloadT>,
84 ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86 #[expect(clippy::too_many_arguments)]
88 pub fn new(
89 provider: Provider,
90 chain_spec: Arc<ChainSpec>,
91 beacon_consensus: ConsensusEngineHandle<PayloadT>,
92 payload_store: PayloadStore<PayloadT>,
93 tx_pool: Pool,
94 task_spawner: Runtime,
95 client: ClientVersionV1,
96 capabilities: EngineCapabilities,
97 validator: Validator,
98 accept_execution_requests_hash: bool,
99 network: impl NetworkInfo + 'static,
100 ) -> Self {
101 let is_syncing = Arc::new(move || network.is_syncing());
102 let inner = Arc::new(EngineApiInner {
103 provider,
104 chain_spec,
105 beacon_consensus,
106 payload_store,
107 task_spawner,
108 metrics: EngineApiMetrics::default(),
109 client,
110 capabilities,
111 tx_pool,
112 validator,
113 accept_execution_requests_hash,
114 is_syncing,
115 });
116 Self { inner }
117 }
118
119 pub fn get_client_version_v1(
121 &self,
122 _client: ClientVersionV1,
123 ) -> EngineApiResult<Vec<ClientVersionV1>> {
124 Ok(vec![self.inner.client.clone()])
125 }
126
127 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
129 Ok(self
130 .inner
131 .payload_store
132 .payload_timestamp(payload_id)
133 .await
134 .ok_or(EngineApiError::UnknownPayload)??)
135 }
136
137 pub async fn new_payload_v1(
140 &self,
141 payload: PayloadT::ExecutionData,
142 ) -> EngineApiResult<PayloadStatus> {
143 let payload_or_attrs = PayloadOrAttributes::<
144 '_,
145 PayloadT::ExecutionData,
146 PayloadT::PayloadAttributes,
147 >::from_execution_payload(&payload);
148
149 self.inner
150 .validator
151 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
154 }
155
156 pub async fn new_payload_v1_metered(
158 &self,
159 payload: PayloadT::ExecutionData,
160 ) -> EngineApiResult<PayloadStatus> {
161 let start = Instant::now();
162 let res = Self::new_payload_v1(self, payload).await;
163 let elapsed = start.elapsed();
164 self.inner.metrics.latency.new_payload_v1.record(elapsed);
165 res
166 }
167
168 pub async fn new_payload_v2(
170 &self,
171 payload: PayloadT::ExecutionData,
172 ) -> EngineApiResult<PayloadStatus> {
173 let payload_or_attrs = PayloadOrAttributes::<
174 '_,
175 PayloadT::ExecutionData,
176 PayloadT::PayloadAttributes,
177 >::from_execution_payload(&payload);
178 self.inner
179 .validator
180 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
181 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
182 }
183
184 pub async fn new_payload_v2_metered(
186 &self,
187 payload: PayloadT::ExecutionData,
188 ) -> EngineApiResult<PayloadStatus> {
189 let start = Instant::now();
190 let res = Self::new_payload_v2(self, payload).await;
191 let elapsed = start.elapsed();
192 self.inner.metrics.latency.new_payload_v2.record(elapsed);
193 res
194 }
195
196 pub async fn new_payload_v3(
198 &self,
199 payload: PayloadT::ExecutionData,
200 ) -> EngineApiResult<PayloadStatus> {
201 let payload_or_attrs = PayloadOrAttributes::<
202 '_,
203 PayloadT::ExecutionData,
204 PayloadT::PayloadAttributes,
205 >::from_execution_payload(&payload);
206 self.inner
207 .validator
208 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
209
210 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
211 }
212
213 pub async fn new_payload_v3_metered(
215 &self,
216 payload: PayloadT::ExecutionData,
217 ) -> RpcResult<PayloadStatus> {
218 let start = Instant::now();
219
220 let res = Self::new_payload_v3(self, payload).await;
221 let elapsed = start.elapsed();
222 self.inner.metrics.latency.new_payload_v3.record(elapsed);
223 Ok(res?)
224 }
225
226 pub async fn new_payload_v4(
228 &self,
229 payload: PayloadT::ExecutionData,
230 ) -> EngineApiResult<PayloadStatus> {
231 let payload_or_attrs = PayloadOrAttributes::<
232 '_,
233 PayloadT::ExecutionData,
234 PayloadT::PayloadAttributes,
235 >::from_execution_payload(&payload);
236 self.inner
237 .validator
238 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
239
240 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
241 }
242
243 pub async fn new_payload_v4_metered(
245 &self,
246 payload: PayloadT::ExecutionData,
247 ) -> RpcResult<PayloadStatus> {
248 let start = Instant::now();
249 let res = Self::new_payload_v4(self, payload).await;
250
251 let elapsed = start.elapsed();
252 self.inner.metrics.latency.new_payload_v4.record(elapsed);
253 Ok(res?)
254 }
255
256 pub async fn new_payload_v5(
262 &self,
263 payload: PayloadT::ExecutionData,
264 ) -> EngineApiResult<PayloadStatus> {
265 let payload_or_attrs = PayloadOrAttributes::<
266 '_,
267 PayloadT::ExecutionData,
268 PayloadT::PayloadAttributes,
269 >::from_execution_payload(&payload);
270 self.inner
271 .validator
272 .validate_version_specific_fields(EngineApiMessageVersion::V6, payload_or_attrs)?;
273 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
274 }
275
276 pub async fn new_payload_v5_metered(
278 &self,
279 payload: PayloadT::ExecutionData,
280 ) -> RpcResult<PayloadStatus> {
281 let start = Instant::now();
282 let res = Self::new_payload_v5(self, payload).await;
283 let elapsed = start.elapsed();
284 self.inner.metrics.latency.new_payload_v5.record(elapsed);
285 Ok(res?)
286 }
287
288 pub fn accept_execution_requests_hash(&self) -> bool {
290 self.inner.accept_execution_requests_hash
291 }
292}
293
294impl<Provider, EngineT, Pool, Validator, ChainSpec>
295 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
296where
297 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
298 EngineT: EngineTypes,
299 Pool: TransactionPool + 'static,
300 Validator: EngineApiValidator<EngineT>,
301 ChainSpec: EthereumHardforks + Send + Sync + 'static,
302{
303 pub async fn fork_choice_updated_v1(
310 &self,
311 state: ForkchoiceState,
312 payload_attrs: Option<EngineT::PayloadAttributes>,
313 ) -> EngineApiResult<ForkchoiceUpdated> {
314 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
315 .await
316 }
317
318 pub async fn fork_choice_updated_v1_metered(
320 &self,
321 state: ForkchoiceState,
322 payload_attrs: Option<EngineT::PayloadAttributes>,
323 ) -> EngineApiResult<ForkchoiceUpdated> {
324 let start = Instant::now();
325 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
326 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
327 res
328 }
329
330 pub async fn fork_choice_updated_v2(
335 &self,
336 state: ForkchoiceState,
337 payload_attrs: Option<EngineT::PayloadAttributes>,
338 ) -> EngineApiResult<ForkchoiceUpdated> {
339 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
340 .await
341 }
342
343 pub async fn fork_choice_updated_v2_metered(
345 &self,
346 state: ForkchoiceState,
347 payload_attrs: Option<EngineT::PayloadAttributes>,
348 ) -> EngineApiResult<ForkchoiceUpdated> {
349 let start = Instant::now();
350 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
351 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
352 res
353 }
354
355 pub async fn fork_choice_updated_v3(
360 &self,
361 state: ForkchoiceState,
362 payload_attrs: Option<EngineT::PayloadAttributes>,
363 ) -> EngineApiResult<ForkchoiceUpdated> {
364 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
365 .await
366 }
367
368 pub async fn fork_choice_updated_v3_metered(
370 &self,
371 state: ForkchoiceState,
372 payload_attrs: Option<EngineT::PayloadAttributes>,
373 ) -> EngineApiResult<ForkchoiceUpdated> {
374 let start = Instant::now();
375 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
376 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
377 res
378 }
379
380 pub async fn fork_choice_updated_v4(
385 &self,
386 state: ForkchoiceState,
387 payload_attrs: Option<EngineT::PayloadAttributes>,
388 ) -> EngineApiResult<ForkchoiceUpdated> {
389 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V6, state, payload_attrs)
390 .await
391 }
392
393 pub async fn fork_choice_updated_v4_metered(
395 &self,
396 state: ForkchoiceState,
397 payload_attrs: Option<EngineT::PayloadAttributes>,
398 ) -> EngineApiResult<ForkchoiceUpdated> {
399 let start = Instant::now();
400 let res = Self::fork_choice_updated_v4(self, state, payload_attrs).await;
401 self.inner.metrics.latency.fork_choice_updated_v4.record(start.elapsed());
402 res
403 }
404
405 async fn get_built_payload(
407 &self,
408 payload_id: PayloadId,
409 ) -> EngineApiResult<EngineT::BuiltPayload> {
410 self.inner
411 .payload_store
412 .resolve(payload_id)
413 .await
414 .ok_or(EngineApiError::UnknownPayload)?
415 .map_err(|_| EngineApiError::UnknownPayload)
416 }
417
418 async fn get_payload_inner<R>(
421 &self,
422 payload_id: PayloadId,
423 version: EngineApiMessageVersion,
424 ) -> EngineApiResult<R>
425 where
426 EngineT::BuiltPayload: TryInto<R>,
427 {
428 let timestamp = self.get_payload_timestamp(payload_id).await?;
431 validate_payload_timestamp(
432 &self.inner.chain_spec,
433 version,
434 timestamp,
435 MessageValidationKind::GetPayload,
436 )?;
437
438 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
440 warn!(?version, "could not transform built payload");
441 EngineApiError::UnknownPayload
442 })
443 }
444
445 pub async fn get_payload_v1(
455 &self,
456 payload_id: PayloadId,
457 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
458 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
459 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
460 EngineApiError::UnknownPayload
461 })
462 }
463
464 pub async fn get_payload_v1_metered(
466 &self,
467 payload_id: PayloadId,
468 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
469 let start = Instant::now();
470 let res = Self::get_payload_v1(self, payload_id).await;
471 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
472 res
473 }
474
475 pub async fn get_payload_v2(
483 &self,
484 payload_id: PayloadId,
485 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
486 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
487 }
488
489 pub async fn get_payload_v2_metered(
491 &self,
492 payload_id: PayloadId,
493 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
494 let start = Instant::now();
495 let res = Self::get_payload_v2(self, payload_id).await;
496 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
497 res
498 }
499
500 pub async fn get_payload_v3(
508 &self,
509 payload_id: PayloadId,
510 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
511 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
512 }
513
514 pub async fn get_payload_v3_metered(
516 &self,
517 payload_id: PayloadId,
518 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
519 let start = Instant::now();
520 let res = Self::get_payload_v3(self, payload_id).await;
521 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
522 res
523 }
524
525 pub async fn get_payload_v4(
533 &self,
534 payload_id: PayloadId,
535 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
536 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
537 }
538
539 pub async fn get_payload_v4_metered(
541 &self,
542 payload_id: PayloadId,
543 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
544 let start = Instant::now();
545 let res = Self::get_payload_v4(self, payload_id).await;
546 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
547 res
548 }
549
550 pub async fn get_payload_v5(
560 &self,
561 payload_id: PayloadId,
562 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
563 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
564 }
565
566 pub async fn get_payload_v5_metered(
568 &self,
569 payload_id: PayloadId,
570 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
571 let start = Instant::now();
572 let res = Self::get_payload_v5(self, payload_id).await;
573 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
574 res
575 }
576
577 pub async fn get_payload_v6(
583 &self,
584 payload_id: PayloadId,
585 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
586 self.get_payload_inner(payload_id, EngineApiMessageVersion::V6).await
587 }
588
589 pub async fn get_payload_v6_metered(
591 &self,
592 payload_id: PayloadId,
593 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
594 let start = Instant::now();
595 let res = Self::get_payload_v6(self, payload_id).await;
596 self.inner.metrics.latency.get_payload_v6.record(start.elapsed());
597 res
598 }
599
600 pub async fn get_payload_bodies_by_range_with<F, R>(
603 &self,
604 start: BlockNumber,
605 count: u64,
606 f: F,
607 ) -> EngineApiResult<Vec<Option<R>>>
608 where
609 F: Fn(Provider::Block) -> R + Send + 'static,
610 R: Send + 'static,
611 {
612 let (tx, rx) = oneshot::channel();
613 let inner = self.inner.clone();
614
615 self.inner.task_spawner.spawn_blocking_task(async move {
616 if count > MAX_PAYLOAD_BODIES_LIMIT {
617 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
618 return;
619 }
620
621 if start == 0 || count == 0 {
622 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
623 return;
624 }
625
626 let mut result = Vec::with_capacity(count as usize);
627
628 let mut end = start.saturating_add(count - 1);
630
631 if let Ok(best_block) = inner.provider.best_block_number()
634 && end > best_block {
635 end = best_block;
636 }
637
638 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
640 for num in start..=end {
641 if num < earliest_block {
642 result.push(None);
643 continue;
644 }
645 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
646 match block_result {
647 Ok(block) => {
648 result.push(block.map(&f));
649 }
650 Err(err) => {
651 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
652 return;
653 }
654 };
655 }
656 tx.send(Ok(result)).ok();
657 });
658
659 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
660 }
661
662 pub async fn get_payload_bodies_by_range_v1(
673 &self,
674 start: BlockNumber,
675 count: u64,
676 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
677 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
678 transactions: block.body().encoded_2718_transactions(),
679 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
680 })
681 .await
682 }
683
684 pub async fn get_payload_bodies_by_range_v1_metered(
686 &self,
687 start: BlockNumber,
688 count: u64,
689 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
690 let start_time = Instant::now();
691 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
692 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
693 res
694 }
695
696 pub async fn get_payload_bodies_by_range_v2(
700 &self,
701 start: BlockNumber,
702 count: u64,
703 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
704 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 {
705 transactions: block.body().encoded_2718_transactions(),
706 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
707 block_access_list: None,
708 })
709 .await
710 }
711
712 pub async fn get_payload_bodies_by_range_v2_metered(
714 &self,
715 start: BlockNumber,
716 count: u64,
717 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
718 let start_time = Instant::now();
719 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
720 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
721 res
722 }
723
724 pub async fn get_payload_bodies_by_hash_with<F, R>(
726 &self,
727 hashes: Vec<BlockHash>,
728 f: F,
729 ) -> EngineApiResult<Vec<Option<R>>>
730 where
731 F: Fn(Provider::Block) -> R + Send + 'static,
732 R: Send + 'static,
733 {
734 let len = hashes.len() as u64;
735 if len > MAX_PAYLOAD_BODIES_LIMIT {
736 return Err(EngineApiError::PayloadRequestTooLarge { len });
737 }
738
739 let (tx, rx) = oneshot::channel();
740 let inner = self.inner.clone();
741
742 self.inner.task_spawner.spawn_blocking_task(async move {
743 let mut result = Vec::with_capacity(hashes.len());
744 for hash in hashes {
745 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
746 match block_result {
747 Ok(block) => {
748 result.push(block.map(&f));
749 }
750 Err(err) => {
751 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
752 return;
753 }
754 }
755 }
756 tx.send(Ok(result)).ok();
757 });
758
759 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
760 }
761
762 pub async fn get_payload_bodies_by_hash_v1(
764 &self,
765 hashes: Vec<BlockHash>,
766 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
767 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
768 transactions: block.body().encoded_2718_transactions(),
769 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
770 })
771 .await
772 }
773
774 pub async fn get_payload_bodies_by_hash_v1_metered(
776 &self,
777 hashes: Vec<BlockHash>,
778 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
779 let start = Instant::now();
780 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
781 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
782 res
783 }
784
785 pub async fn get_payload_bodies_by_hash_v2(
789 &self,
790 hashes: Vec<BlockHash>,
791 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
792 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV2 {
793 transactions: block.body().encoded_2718_transactions(),
794 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
795 block_access_list: None,
796 })
797 .await
798 }
799
800 pub async fn get_payload_bodies_by_hash_v2_metered(
802 &self,
803 hashes: Vec<BlockHash>,
804 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
805 let start = Instant::now();
806 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
807 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
808 res
809 }
810
811 async fn validate_and_execute_forkchoice(
825 &self,
826 version: EngineApiMessageVersion,
827 state: ForkchoiceState,
828 payload_attrs: Option<EngineT::PayloadAttributes>,
829 ) -> EngineApiResult<ForkchoiceUpdated> {
830 if let Some(ref attrs) = payload_attrs {
831 let attr_validation_res =
832 self.inner.validator.ensure_well_formed_attributes(version, attrs);
833
834 if let Err(err) = attr_validation_res {
844 let fcu_res = self.inner.beacon_consensus.fork_choice_updated(state, None).await?;
845 if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
846 return Ok(fcu_res)
847 }
848 return Err(err.into())
849 }
850 }
851
852 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
853 }
854
855 pub fn capabilities(&self) -> &EngineCapabilities {
857 &self.inner.capabilities
858 }
859
860 fn get_blobs_v1(
861 &self,
862 versioned_hashes: Vec<B256>,
863 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
864 let current_timestamp =
866 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
867 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
868 return Err(EngineApiError::EngineObjectValidationError(
869 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
870 ));
871 }
872
873 if versioned_hashes.len() > MAX_BLOB_LIMIT {
874 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
875 }
876
877 self.inner
878 .tx_pool
879 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
880 .map_err(|err| EngineApiError::Internal(Box::new(err)))
881 }
882
883 pub fn get_blobs_v1_metered(
885 &self,
886 versioned_hashes: Vec<B256>,
887 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
888 let hashes_len = versioned_hashes.len();
889 let start = Instant::now();
890 let res = Self::get_blobs_v1(self, versioned_hashes);
891 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
892
893 if let Ok(blobs) = &res {
894 let blobs_found = blobs.iter().flatten().count();
895 let blobs_missed = hashes_len - blobs_found;
896
897 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
898 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
899 }
900
901 res
902 }
903
904 fn get_blobs_v2(
905 &self,
906 versioned_hashes: Vec<B256>,
907 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
908 let current_timestamp =
910 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
911 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
912 return Err(EngineApiError::EngineObjectValidationError(
913 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
914 ));
915 }
916
917 if versioned_hashes.len() > MAX_BLOB_LIMIT {
918 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
919 }
920
921 self.inner
922 .tx_pool
923 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
924 .map_err(|err| EngineApiError::Internal(Box::new(err)))
925 }
926
927 fn get_blobs_v3(
928 &self,
929 versioned_hashes: Vec<B256>,
930 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
931 let current_timestamp =
933 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
934 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
935 return Err(EngineApiError::EngineObjectValidationError(
936 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
937 ));
938 }
939
940 if versioned_hashes.len() > MAX_BLOB_LIMIT {
941 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
942 }
943
944 if (*self.inner.is_syncing)() {
946 return Ok(None)
947 }
948
949 self.inner
950 .tx_pool
951 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
952 .map(Some)
953 .map_err(|err| EngineApiError::Internal(Box::new(err)))
954 }
955
956 pub fn get_blobs_v2_metered(
958 &self,
959 versioned_hashes: Vec<B256>,
960 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
961 let hashes_len = versioned_hashes.len();
962 let start = Instant::now();
963 let res = Self::get_blobs_v2(self, versioned_hashes);
964 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
965
966 if let Ok(blobs) = &res {
967 let blobs_found = blobs.iter().flatten().count();
968
969 self.inner
970 .metrics
971 .blob_metrics
972 .get_blobs_requests_blobs_total
973 .increment(hashes_len as u64);
974 self.inner
975 .metrics
976 .blob_metrics
977 .get_blobs_requests_blobs_in_blobpool_total
978 .increment(blobs_found as u64);
979
980 if blobs_found == hashes_len {
981 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
982 } else {
983 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
984 }
985 } else {
986 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
987 }
988
989 res
990 }
991
992 pub fn get_blobs_v3_metered(
994 &self,
995 versioned_hashes: Vec<B256>,
996 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
997 let hashes_len = versioned_hashes.len();
998 let start = Instant::now();
999 let res = Self::get_blobs_v3(self, versioned_hashes);
1000 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
1001
1002 if let Ok(Some(blobs)) = &res {
1003 let blobs_found = blobs.iter().flatten().count();
1004 let blobs_missed = hashes_len - blobs_found;
1005
1006 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1007 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1008 }
1009
1010 res
1011 }
1012}
1013
1014#[async_trait]
1016impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
1017 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1018where
1019 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
1020 EngineT: EngineTypes<ExecutionData = ExecutionData>,
1021 Pool: TransactionPool + 'static,
1022 Validator: EngineApiValidator<EngineT>,
1023 ChainSpec: EthereumHardforks + Send + Sync + 'static,
1024{
1025 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
1029 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
1030 let payload =
1031 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
1032 Ok(self.new_payload_v1_metered(payload).await?)
1033 }
1034
1035 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
1038 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
1039 let payload = ExecutionData {
1040 payload: payload.into_payload(),
1041 sidecar: ExecutionPayloadSidecar::none(),
1042 };
1043
1044 Ok(self.new_payload_v2_metered(payload).await?)
1045 }
1046
1047 async fn new_payload_v3(
1050 &self,
1051 payload: ExecutionPayloadV3,
1052 versioned_hashes: Vec<B256>,
1053 parent_beacon_block_root: B256,
1054 ) -> RpcResult<PayloadStatus> {
1055 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
1056 let payload = ExecutionData {
1057 payload: payload.into(),
1058 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
1059 versioned_hashes,
1060 parent_beacon_block_root,
1061 }),
1062 };
1063
1064 Ok(self.new_payload_v3_metered(payload).await?)
1065 }
1066
1067 async fn new_payload_v4(
1070 &self,
1071 payload: ExecutionPayloadV3,
1072 versioned_hashes: Vec<B256>,
1073 parent_beacon_block_root: B256,
1074 requests: RequestsOrHash,
1075 ) -> RpcResult<PayloadStatus> {
1076 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1077
1078 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1080 return Err(EngineApiError::UnexpectedRequestsHash.into());
1081 }
1082
1083 let payload = ExecutionData {
1084 payload: payload.into(),
1085 sidecar: ExecutionPayloadSidecar::v4(
1086 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1087 PraguePayloadFields { requests },
1088 ),
1089 };
1090
1091 Ok(self.new_payload_v4_metered(payload).await?)
1092 }
1093
1094 async fn new_payload_v5(
1100 &self,
1101 payload: ExecutionPayloadV4,
1102 versioned_hashes: Vec<B256>,
1103 parent_beacon_block_root: B256,
1104 requests: RequestsOrHash,
1105 ) -> RpcResult<PayloadStatus> {
1106 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1107 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1109 return Err(EngineApiError::UnexpectedRequestsHash.into());
1110 }
1111
1112 let payload = ExecutionData {
1113 payload: payload.into(),
1114 sidecar: ExecutionPayloadSidecar::v4(
1115 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1116 PraguePayloadFields { requests },
1117 ),
1118 };
1119
1120 Ok(self.new_payload_v5_metered(payload).await?)
1121 }
1122
1123 async fn fork_choice_updated_v1(
1128 &self,
1129 fork_choice_state: ForkchoiceState,
1130 payload_attributes: Option<EngineT::PayloadAttributes>,
1131 ) -> RpcResult<ForkchoiceUpdated> {
1132 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1133 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1134 }
1135
1136 async fn fork_choice_updated_v2(
1139 &self,
1140 fork_choice_state: ForkchoiceState,
1141 payload_attributes: Option<EngineT::PayloadAttributes>,
1142 ) -> RpcResult<ForkchoiceUpdated> {
1143 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1144 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1145 }
1146
1147 async fn fork_choice_updated_v3(
1151 &self,
1152 fork_choice_state: ForkchoiceState,
1153 payload_attributes: Option<EngineT::PayloadAttributes>,
1154 ) -> RpcResult<ForkchoiceUpdated> {
1155 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1156 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1157 }
1158
1159 async fn fork_choice_updated_v4(
1163 &self,
1164 fork_choice_state: ForkchoiceState,
1165 payload_attributes: Option<EngineT::PayloadAttributes>,
1166 ) -> RpcResult<ForkchoiceUpdated> {
1167 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV4");
1168 Ok(self.fork_choice_updated_v4_metered(fork_choice_state, payload_attributes).await?)
1169 }
1170
1171 async fn get_payload_v1(
1183 &self,
1184 payload_id: PayloadId,
1185 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1186 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1187 Ok(self.get_payload_v1_metered(payload_id).await?)
1188 }
1189
1190 async fn get_payload_v2(
1200 &self,
1201 payload_id: PayloadId,
1202 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1203 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1204 Ok(self.get_payload_v2_metered(payload_id).await?)
1205 }
1206
1207 async fn get_payload_v3(
1217 &self,
1218 payload_id: PayloadId,
1219 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1220 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1221 Ok(self.get_payload_v3_metered(payload_id).await?)
1222 }
1223
1224 async fn get_payload_v4(
1234 &self,
1235 payload_id: PayloadId,
1236 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1237 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1238 Ok(self.get_payload_v4_metered(payload_id).await?)
1239 }
1240
1241 async fn get_payload_v5(
1251 &self,
1252 payload_id: PayloadId,
1253 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1254 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1255 Ok(self.get_payload_v5_metered(payload_id).await?)
1256 }
1257
1258 async fn get_payload_v6(
1264 &self,
1265 payload_id: PayloadId,
1266 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1267 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1268 Ok(self.get_payload_v6_metered(payload_id).await?)
1269 }
1270
1271 async fn get_payload_bodies_by_hash_v1(
1274 &self,
1275 block_hashes: Vec<BlockHash>,
1276 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1277 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1278 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1279 }
1280
1281 async fn get_payload_bodies_by_hash_v2(
1287 &self,
1288 block_hashes: Vec<BlockHash>,
1289 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1290 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1291 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1292 }
1293
1294 async fn get_payload_bodies_by_range_v1(
1311 &self,
1312 start: U64,
1313 count: U64,
1314 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1315 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1316 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1317 }
1318
1319 async fn get_payload_bodies_by_range_v2(
1325 &self,
1326 start: U64,
1327 count: U64,
1328 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1329 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1330 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1331 }
1332
1333 async fn get_client_version_v1(
1337 &self,
1338 client: ClientVersionV1,
1339 ) -> RpcResult<Vec<ClientVersionV1>> {
1340 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1341 Ok(Self::get_client_version_v1(self, client)?)
1342 }
1343
1344 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1347 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1348
1349 let el_caps = self.capabilities();
1350 el_caps.log_capability_mismatches(&capabilities);
1351
1352 Ok(el_caps.list())
1353 }
1354
1355 async fn get_blobs_v1(
1356 &self,
1357 versioned_hashes: Vec<B256>,
1358 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1359 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1360 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1361 }
1362
1363 async fn get_blobs_v2(
1364 &self,
1365 versioned_hashes: Vec<B256>,
1366 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1367 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1368 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1369 }
1370
1371 async fn get_blobs_v3(
1372 &self,
1373 versioned_hashes: Vec<B256>,
1374 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1375 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1376 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1377 }
1378}
1379
1380impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1381 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1382where
1383 EngineT: EngineTypes,
1384 Self: EngineApiServer<EngineT>,
1385{
1386 fn into_rpc_module(self) -> RpcModule<()> {
1387 EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1388 }
1389}
1390
1391impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1392 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1393where
1394 PayloadT: PayloadTypes,
1395{
1396 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1397 f.debug_struct("EngineApi").finish_non_exhaustive()
1398 }
1399}
1400
1401impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1402 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1403where
1404 PayloadT: PayloadTypes,
1405{
1406 fn clone(&self) -> Self {
1407 Self { inner: Arc::clone(&self.inner) }
1408 }
1409}
1410
1411struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1413 provider: Provider,
1415 chain_spec: Arc<ChainSpec>,
1417 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1419 payload_store: PayloadStore<PayloadT>,
1421 task_spawner: Runtime,
1423 metrics: EngineApiMetrics,
1425 client: ClientVersionV1,
1427 capabilities: EngineCapabilities,
1429 tx_pool: Pool,
1431 validator: Validator,
1433 accept_execution_requests_hash: bool,
1434 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440 use super::*;
1441 use alloy_primitives::{Address, B256};
1442 use alloy_rpc_types_engine::{
1443 ClientCode, ClientVersionV1, PayloadAttributes, PayloadStatusEnum,
1444 };
1445 use assert_matches::assert_matches;
1446 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1447 use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1448 use reth_ethereum_engine_primitives::EthEngineTypes;
1449 use reth_ethereum_primitives::Block;
1450 use reth_network_api::{
1451 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1452 };
1453 use reth_node_ethereum::EthereumEngineValidator;
1454 use reth_payload_builder::test_utils::spawn_test_payload_service;
1455 use reth_provider::test_utils::MockEthProvider;
1456 use reth_tasks::Runtime;
1457 use reth_transaction_pool::noop::NoopTransactionPool;
1458 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1459
1460 fn setup_engine_api() -> (
1461 EngineApiTestHandle,
1462 EngineApi<
1463 Arc<MockEthProvider>,
1464 EthEngineTypes,
1465 NoopTransactionPool,
1466 EthereumEngineValidator,
1467 ChainSpec,
1468 >,
1469 ) {
1470 let client = ClientVersionV1 {
1471 code: ClientCode::RH,
1472 name: "Reth".to_string(),
1473 version: "v0.2.0-beta.5".to_string(),
1474 commit: "defa64b2".to_string(),
1475 };
1476
1477 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1478 let provider = Arc::new(MockEthProvider::default());
1479 let payload_store = spawn_test_payload_service();
1480 let (to_engine, engine_rx) = unbounded_channel();
1481 let task_executor = Runtime::test();
1482 let api = EngineApi::new(
1483 provider.clone(),
1484 chain_spec.clone(),
1485 ConsensusEngineHandle::new(to_engine),
1486 payload_store.into(),
1487 NoopTransactionPool::default(),
1488 task_executor,
1489 client,
1490 EngineCapabilities::default(),
1491 EthereumEngineValidator::new(chain_spec.clone()),
1492 false,
1493 NoopNetwork::default(),
1494 );
1495 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1496 (handle, api)
1497 }
1498
1499 #[tokio::test]
1500 async fn engine_client_version_v1() {
1501 let client = ClientVersionV1 {
1502 code: ClientCode::RH,
1503 name: "Reth".to_string(),
1504 version: "v0.2.0-beta.5".to_string(),
1505 commit: "defa64b2".to_string(),
1506 };
1507 let (_, api) = setup_engine_api();
1508 let res = api.get_client_version_v1(client.clone());
1509 assert_eq!(res.unwrap(), vec![client]);
1510 }
1511
1512 struct EngineApiTestHandle {
1513 #[allow(dead_code)]
1514 chain_spec: Arc<ChainSpec>,
1515 provider: Arc<MockEthProvider>,
1516 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1517 }
1518
1519 #[tokio::test]
1520 async fn forwards_responses_to_consensus_engine() {
1521 let (mut handle, api) = setup_engine_api();
1522
1523 tokio::spawn(async move {
1524 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1525 let execution_data = ExecutionData {
1526 payload: payload_v1.into(),
1527 sidecar: ExecutionPayloadSidecar::none(),
1528 };
1529
1530 api.new_payload_v1(execution_data).await.unwrap();
1531 });
1532 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1533 }
1534
1535 #[derive(Clone)]
1536 struct TestNetworkInfo {
1537 syncing: bool,
1538 }
1539
1540 impl NetworkInfo for TestNetworkInfo {
1541 fn local_addr(&self) -> std::net::SocketAddr {
1542 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1543 }
1544
1545 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1546 #[allow(deprecated)]
1547 Ok(NetworkStatus {
1548 client_version: "test".to_string(),
1549 protocol_version: 5,
1550 eth_protocol_info: EthProtocolInfo {
1551 network: 1,
1552 difficulty: None,
1553 genesis: Default::default(),
1554 config: Default::default(),
1555 head: Default::default(),
1556 },
1557 capabilities: vec![],
1558 })
1559 }
1560
1561 fn chain_id(&self) -> u64 {
1562 1
1563 }
1564
1565 fn is_syncing(&self) -> bool {
1566 self.syncing
1567 }
1568
1569 fn is_initially_syncing(&self) -> bool {
1570 self.syncing
1571 }
1572 }
1573
1574 #[tokio::test]
1575 async fn get_blobs_v3_returns_null_when_syncing() {
1576 let chain_spec: Arc<ChainSpec> =
1577 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1578 let provider = Arc::new(MockEthProvider::default());
1579 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1580 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1581
1582 let api = EngineApi::new(
1583 provider,
1584 chain_spec.clone(),
1585 ConsensusEngineHandle::new(to_engine),
1586 payload_store.into(),
1587 NoopTransactionPool::default(),
1588 Runtime::test(),
1589 ClientVersionV1 {
1590 code: ClientCode::RH,
1591 name: "Reth".to_string(),
1592 version: "v0.0.0-test".to_string(),
1593 commit: "test".to_string(),
1594 },
1595 EngineCapabilities::default(),
1596 EthereumEngineValidator::new(chain_spec),
1597 false,
1598 TestNetworkInfo { syncing: true },
1599 );
1600
1601 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1602 assert_matches!(res, Ok(None));
1603 }
1604
1605 #[tokio::test]
1606 async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
1607 let (mut handle, api) = setup_engine_api();
1608
1609 let state = ForkchoiceState {
1610 head_block_hash: B256::from([0x11; 32]),
1611 safe_block_hash: B256::ZERO,
1612 finalized_block_hash: B256::ZERO,
1613 };
1614 let payload_attributes = PayloadAttributes {
1615 timestamp: 1,
1616 prev_randao: B256::ZERO,
1617 suggested_fee_recipient: Address::ZERO,
1618 withdrawals: Some(vec![]),
1619 parent_beacon_block_root: None,
1621 slot_number: None,
1622 };
1623
1624 let api_task = tokio::spawn(async move {
1625 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1626 });
1627
1628 let request =
1629 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1630 .await
1631 .expect("timed out waiting for forkchoiceUpdated request")
1632 .expect("expected forkchoiceUpdated request");
1633 let response_tx = match request {
1634 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1635 assert!(
1636 payload_attrs.is_none(),
1637 "FCU for syncing state should be evaluated before payload attributes"
1638 );
1639 tx
1640 }
1641 other => panic!("unexpected engine message: {other:?}"),
1642 };
1643
1644 response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
1645
1646 let response = api_task
1647 .await
1648 .expect("api task should not panic")
1649 .expect("forkchoiceUpdatedV3 should return a syncing response");
1650 assert!(response.payload_status.is_syncing());
1651 assert!(response.payload_id.is_none());
1652 }
1653
1654 #[tokio::test]
1655 async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
1656 let (mut handle, api) = setup_engine_api();
1657
1658 let state = ForkchoiceState {
1659 head_block_hash: B256::from([0x22; 32]),
1660 safe_block_hash: B256::ZERO,
1661 finalized_block_hash: B256::ZERO,
1662 };
1663 let payload_attributes = PayloadAttributes {
1664 timestamp: 1,
1665 prev_randao: B256::ZERO,
1666 suggested_fee_recipient: Address::ZERO,
1667 withdrawals: Some(vec![]),
1668 parent_beacon_block_root: None,
1669 slot_number: None,
1670 };
1671
1672 let api_task = tokio::spawn(async move {
1673 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1674 });
1675
1676 let request =
1677 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1678 .await
1679 .expect("timed out waiting for forkchoiceUpdated request")
1680 .expect("expected forkchoiceUpdated request");
1681
1682 let response_tx = match request {
1683 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1684 assert!(
1685 payload_attrs.is_none(),
1686 "when attrs are invalid, API should first evaluate forkchoice without attrs"
1687 );
1688 tx
1689 }
1690 other => panic!("unexpected engine message: {other:?}"),
1691 };
1692
1693 response_tx
1694 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1695 PayloadStatusEnum::Valid,
1696 ))))
1697 .expect("send valid response");
1698
1699 let response = api_task.await.expect("api task should not panic");
1700 assert_matches!(
1701 response,
1702 Err(EngineApiError::EngineObjectValidationError(
1703 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
1704 ))
1705 );
1706
1707 match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
1708 .await
1709 {
1710 Err(_) | Ok(None) => {}
1711 Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
1712 panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
1713 }
1714 Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
1715 }
1716 }
1717
1718 mod get_payload_bodies {
1720 use super::*;
1721 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1722 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1723
1724 #[tokio::test]
1725 async fn invalid_params() {
1726 let (_, api) = setup_engine_api();
1727
1728 let by_range_tests = [
1729 (0, 0),
1731 (0, 1),
1732 (1, 0),
1733 ];
1734
1735 for (start, count) in by_range_tests {
1737 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1738 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1739 }
1740 }
1741
1742 #[tokio::test]
1743 async fn request_too_large() {
1744 let (_, api) = setup_engine_api();
1745
1746 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1747 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1748 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1749 }
1750
1751 #[tokio::test]
1752 async fn returns_payload_bodies() {
1753 let mut rng = generators::rng();
1754 let (handle, api) = setup_engine_api();
1755
1756 let (start, count) = (1, 10);
1757 let blocks = random_block_range(
1758 &mut rng,
1759 start..=start + count - 1,
1760 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1761 );
1762 handle
1763 .provider
1764 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1765
1766 let expected = blocks
1767 .iter()
1768 .cloned()
1769 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1770 .collect::<Vec<_>>();
1771
1772 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1773 assert_eq!(res, expected);
1774 }
1775
1776 #[tokio::test]
1777 async fn returns_payload_bodies_with_gaps() {
1778 let mut rng = generators::rng();
1779 let (handle, api) = setup_engine_api();
1780
1781 let (start, count) = (1, 100);
1782 let blocks = random_block_range(
1783 &mut rng,
1784 start..=start + count - 1,
1785 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1786 );
1787
1788 let first_missing_range = 26..=50;
1790 let second_missing_range = 76..=100;
1791 handle.provider.extend_blocks(
1792 blocks
1793 .iter()
1794 .filter(|b| {
1795 !first_missing_range.contains(&b.number) &&
1796 !second_missing_range.contains(&b.number)
1797 })
1798 .map(|b| (b.hash(), b.clone().into_block())),
1799 );
1800
1801 let expected = blocks
1802 .iter()
1803 .filter(|b| !second_missing_range.contains(&b.number))
1806 .cloned()
1807 .map(|b| {
1808 if first_missing_range.contains(&b.number) {
1809 None
1810 } else {
1811 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1812 }
1813 })
1814 .collect::<Vec<_>>();
1815
1816 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1817 assert_eq!(res, expected);
1818
1819 let expected = blocks
1820 .iter()
1821 .cloned()
1822 .map(|b| {
1825 if first_missing_range.contains(&b.number) ||
1826 second_missing_range.contains(&b.number)
1827 {
1828 None
1829 } else {
1830 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1831 }
1832 })
1833 .collect::<Vec<_>>();
1834
1835 let hashes = blocks.iter().map(|b| b.hash()).collect();
1836 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1837 assert_eq!(res, expected);
1838 }
1839 }
1840}