1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::NetworkInfo;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
81 PayloadT: PayloadTypes,
82 Pool: TransactionPool + 'static,
83 Validator: EngineApiValidator<PayloadT>,
84 ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86 #[expect(clippy::too_many_arguments)]
88 pub fn new(
89 provider: Provider,
90 chain_spec: Arc<ChainSpec>,
91 beacon_consensus: ConsensusEngineHandle<PayloadT>,
92 payload_store: PayloadStore<PayloadT>,
93 tx_pool: Pool,
94 task_spawner: Runtime,
95 client: ClientVersionV1,
96 capabilities: EngineCapabilities,
97 validator: Validator,
98 accept_execution_requests_hash: bool,
99 network: impl NetworkInfo + 'static,
100 ) -> Self {
101 let is_syncing = Arc::new(move || network.is_syncing());
102 let inner = Arc::new(EngineApiInner {
103 provider,
104 chain_spec,
105 beacon_consensus,
106 payload_store,
107 task_spawner,
108 metrics: EngineApiMetrics::default(),
109 client,
110 capabilities,
111 tx_pool,
112 validator,
113 accept_execution_requests_hash,
114 is_syncing,
115 });
116 Self { inner }
117 }
118
119 pub fn get_client_version_v1(
121 &self,
122 _client: ClientVersionV1,
123 ) -> EngineApiResult<Vec<ClientVersionV1>> {
124 Ok(vec![self.inner.client.clone()])
125 }
126
127 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
129 Ok(self
130 .inner
131 .payload_store
132 .payload_timestamp(payload_id)
133 .await
134 .ok_or(EngineApiError::UnknownPayload)??)
135 }
136
137 pub async fn new_payload_v1(
140 &self,
141 payload: PayloadT::ExecutionData,
142 ) -> EngineApiResult<PayloadStatus> {
143 let payload_or_attrs = PayloadOrAttributes::<
144 '_,
145 PayloadT::ExecutionData,
146 PayloadT::PayloadAttributes,
147 >::from_execution_payload(&payload);
148
149 self.inner
150 .validator
151 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
154 }
155
156 pub async fn new_payload_v1_metered(
158 &self,
159 payload: PayloadT::ExecutionData,
160 ) -> EngineApiResult<PayloadStatus> {
161 let start = Instant::now();
162 let res = Self::new_payload_v1(self, payload).await;
163 let elapsed = start.elapsed();
164 self.inner.metrics.latency.new_payload_v1.record(elapsed);
165 res
166 }
167
168 pub async fn new_payload_v2(
170 &self,
171 payload: PayloadT::ExecutionData,
172 ) -> EngineApiResult<PayloadStatus> {
173 let payload_or_attrs = PayloadOrAttributes::<
174 '_,
175 PayloadT::ExecutionData,
176 PayloadT::PayloadAttributes,
177 >::from_execution_payload(&payload);
178 self.inner
179 .validator
180 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
181 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
182 }
183
184 pub async fn new_payload_v2_metered(
186 &self,
187 payload: PayloadT::ExecutionData,
188 ) -> EngineApiResult<PayloadStatus> {
189 let start = Instant::now();
190 let res = Self::new_payload_v2(self, payload).await;
191 let elapsed = start.elapsed();
192 self.inner.metrics.latency.new_payload_v2.record(elapsed);
193 res
194 }
195
196 pub async fn new_payload_v3(
198 &self,
199 payload: PayloadT::ExecutionData,
200 ) -> EngineApiResult<PayloadStatus> {
201 let payload_or_attrs = PayloadOrAttributes::<
202 '_,
203 PayloadT::ExecutionData,
204 PayloadT::PayloadAttributes,
205 >::from_execution_payload(&payload);
206 self.inner
207 .validator
208 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
209
210 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
211 }
212
213 pub async fn new_payload_v3_metered(
215 &self,
216 payload: PayloadT::ExecutionData,
217 ) -> RpcResult<PayloadStatus> {
218 let start = Instant::now();
219
220 let res = Self::new_payload_v3(self, payload).await;
221 let elapsed = start.elapsed();
222 self.inner.metrics.latency.new_payload_v3.record(elapsed);
223 Ok(res?)
224 }
225
226 pub async fn new_payload_v4(
228 &self,
229 payload: PayloadT::ExecutionData,
230 ) -> EngineApiResult<PayloadStatus> {
231 let payload_or_attrs = PayloadOrAttributes::<
232 '_,
233 PayloadT::ExecutionData,
234 PayloadT::PayloadAttributes,
235 >::from_execution_payload(&payload);
236 self.inner
237 .validator
238 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
239
240 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
241 }
242
243 pub async fn new_payload_v4_metered(
245 &self,
246 payload: PayloadT::ExecutionData,
247 ) -> RpcResult<PayloadStatus> {
248 let start = Instant::now();
249 let res = Self::new_payload_v4(self, payload).await;
250
251 let elapsed = start.elapsed();
252 self.inner.metrics.latency.new_payload_v4.record(elapsed);
253 Ok(res?)
254 }
255
256 pub fn accept_execution_requests_hash(&self) -> bool {
258 self.inner.accept_execution_requests_hash
259 }
260}
261
262impl<Provider, EngineT, Pool, Validator, ChainSpec>
263 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
264where
265 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
266 EngineT: EngineTypes,
267 Pool: TransactionPool + 'static,
268 Validator: EngineApiValidator<EngineT>,
269 ChainSpec: EthereumHardforks + Send + Sync + 'static,
270{
271 pub async fn fork_choice_updated_v1(
278 &self,
279 state: ForkchoiceState,
280 payload_attrs: Option<EngineT::PayloadAttributes>,
281 ) -> EngineApiResult<ForkchoiceUpdated> {
282 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
283 .await
284 }
285
286 pub async fn fork_choice_updated_v1_metered(
288 &self,
289 state: ForkchoiceState,
290 payload_attrs: Option<EngineT::PayloadAttributes>,
291 ) -> EngineApiResult<ForkchoiceUpdated> {
292 let start = Instant::now();
293 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
294 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
295 res
296 }
297
298 pub async fn fork_choice_updated_v2(
303 &self,
304 state: ForkchoiceState,
305 payload_attrs: Option<EngineT::PayloadAttributes>,
306 ) -> EngineApiResult<ForkchoiceUpdated> {
307 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
308 .await
309 }
310
311 pub async fn fork_choice_updated_v2_metered(
313 &self,
314 state: ForkchoiceState,
315 payload_attrs: Option<EngineT::PayloadAttributes>,
316 ) -> EngineApiResult<ForkchoiceUpdated> {
317 let start = Instant::now();
318 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
319 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
320 res
321 }
322
323 pub async fn fork_choice_updated_v3(
328 &self,
329 state: ForkchoiceState,
330 payload_attrs: Option<EngineT::PayloadAttributes>,
331 ) -> EngineApiResult<ForkchoiceUpdated> {
332 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
333 .await
334 }
335
336 pub async fn fork_choice_updated_v3_metered(
338 &self,
339 state: ForkchoiceState,
340 payload_attrs: Option<EngineT::PayloadAttributes>,
341 ) -> EngineApiResult<ForkchoiceUpdated> {
342 let start = Instant::now();
343 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
344 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
345 res
346 }
347
348 async fn get_built_payload(
350 &self,
351 payload_id: PayloadId,
352 ) -> EngineApiResult<EngineT::BuiltPayload> {
353 self.inner
354 .payload_store
355 .resolve(payload_id)
356 .await
357 .ok_or(EngineApiError::UnknownPayload)?
358 .map_err(|_| EngineApiError::UnknownPayload)
359 }
360
361 async fn get_payload_inner<R>(
364 &self,
365 payload_id: PayloadId,
366 version: EngineApiMessageVersion,
367 ) -> EngineApiResult<R>
368 where
369 EngineT::BuiltPayload: TryInto<R>,
370 {
371 let timestamp = self.get_payload_timestamp(payload_id).await?;
374 validate_payload_timestamp(
375 &self.inner.chain_spec,
376 version,
377 timestamp,
378 MessageValidationKind::GetPayload,
379 )?;
380
381 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
383 warn!(?version, "could not transform built payload");
384 EngineApiError::UnknownPayload
385 })
386 }
387
388 pub async fn get_payload_v1(
398 &self,
399 payload_id: PayloadId,
400 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
401 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
402 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
403 EngineApiError::UnknownPayload
404 })
405 }
406
407 pub async fn get_payload_v1_metered(
409 &self,
410 payload_id: PayloadId,
411 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
412 let start = Instant::now();
413 let res = Self::get_payload_v1(self, payload_id).await;
414 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
415 res
416 }
417
418 pub async fn get_payload_v2(
426 &self,
427 payload_id: PayloadId,
428 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
429 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
430 }
431
432 pub async fn get_payload_v2_metered(
434 &self,
435 payload_id: PayloadId,
436 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
437 let start = Instant::now();
438 let res = Self::get_payload_v2(self, payload_id).await;
439 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
440 res
441 }
442
443 pub async fn get_payload_v3(
451 &self,
452 payload_id: PayloadId,
453 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
454 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
455 }
456
457 pub async fn get_payload_v3_metered(
459 &self,
460 payload_id: PayloadId,
461 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
462 let start = Instant::now();
463 let res = Self::get_payload_v3(self, payload_id).await;
464 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
465 res
466 }
467
468 pub async fn get_payload_v4(
476 &self,
477 payload_id: PayloadId,
478 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
479 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
480 }
481
482 pub async fn get_payload_v4_metered(
484 &self,
485 payload_id: PayloadId,
486 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
487 let start = Instant::now();
488 let res = Self::get_payload_v4(self, payload_id).await;
489 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
490 res
491 }
492
493 pub async fn get_payload_v5(
503 &self,
504 payload_id: PayloadId,
505 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
506 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
507 }
508
509 pub async fn get_payload_v5_metered(
511 &self,
512 payload_id: PayloadId,
513 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
514 let start = Instant::now();
515 let res = Self::get_payload_v5(self, payload_id).await;
516 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
517 res
518 }
519
520 pub async fn get_payload_bodies_by_range_with<F, R>(
523 &self,
524 start: BlockNumber,
525 count: u64,
526 f: F,
527 ) -> EngineApiResult<Vec<Option<R>>>
528 where
529 F: Fn(Provider::Block) -> R + Send + 'static,
530 R: Send + 'static,
531 {
532 let (tx, rx) = oneshot::channel();
533 let inner = self.inner.clone();
534
535 self.inner.task_spawner.spawn_blocking_task(async move {
536 if count > MAX_PAYLOAD_BODIES_LIMIT {
537 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
538 return;
539 }
540
541 if start == 0 || count == 0 {
542 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
543 return;
544 }
545
546 let mut result = Vec::with_capacity(count as usize);
547
548 let mut end = start.saturating_add(count - 1);
550
551 if let Ok(best_block) = inner.provider.best_block_number()
554 && end > best_block {
555 end = best_block;
556 }
557
558 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
560 for num in start..=end {
561 if num < earliest_block {
562 result.push(None);
563 continue;
564 }
565 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
566 match block_result {
567 Ok(block) => {
568 result.push(block.map(&f));
569 }
570 Err(err) => {
571 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
572 return;
573 }
574 };
575 }
576 tx.send(Ok(result)).ok();
577 });
578
579 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
580 }
581
582 pub async fn get_payload_bodies_by_range_v1(
593 &self,
594 start: BlockNumber,
595 count: u64,
596 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
597 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
598 transactions: block.body().encoded_2718_transactions(),
599 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
600 })
601 .await
602 }
603
604 pub async fn get_payload_bodies_by_range_v1_metered(
606 &self,
607 start: BlockNumber,
608 count: u64,
609 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
610 let start_time = Instant::now();
611 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
612 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
613 res
614 }
615
616 pub async fn get_payload_bodies_by_range_v2(
620 &self,
621 start: BlockNumber,
622 count: u64,
623 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
624 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 {
625 transactions: block.body().encoded_2718_transactions(),
626 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
627 block_access_list: None,
628 })
629 .await
630 }
631
632 pub async fn get_payload_bodies_by_range_v2_metered(
634 &self,
635 start: BlockNumber,
636 count: u64,
637 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
638 let start_time = Instant::now();
639 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
640 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
641 res
642 }
643
644 pub async fn get_payload_bodies_by_hash_with<F, R>(
646 &self,
647 hashes: Vec<BlockHash>,
648 f: F,
649 ) -> EngineApiResult<Vec<Option<R>>>
650 where
651 F: Fn(Provider::Block) -> R + Send + 'static,
652 R: Send + 'static,
653 {
654 let len = hashes.len() as u64;
655 if len > MAX_PAYLOAD_BODIES_LIMIT {
656 return Err(EngineApiError::PayloadRequestTooLarge { len });
657 }
658
659 let (tx, rx) = oneshot::channel();
660 let inner = self.inner.clone();
661
662 self.inner.task_spawner.spawn_blocking_task(async move {
663 let mut result = Vec::with_capacity(hashes.len());
664 for hash in hashes {
665 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
666 match block_result {
667 Ok(block) => {
668 result.push(block.map(&f));
669 }
670 Err(err) => {
671 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
672 return;
673 }
674 }
675 }
676 tx.send(Ok(result)).ok();
677 });
678
679 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
680 }
681
682 pub async fn get_payload_bodies_by_hash_v1(
684 &self,
685 hashes: Vec<BlockHash>,
686 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
687 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
688 transactions: block.body().encoded_2718_transactions(),
689 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
690 })
691 .await
692 }
693
694 pub async fn get_payload_bodies_by_hash_v1_metered(
696 &self,
697 hashes: Vec<BlockHash>,
698 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
699 let start = Instant::now();
700 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
701 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
702 res
703 }
704
705 pub async fn get_payload_bodies_by_hash_v2(
709 &self,
710 hashes: Vec<BlockHash>,
711 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
712 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV2 {
713 transactions: block.body().encoded_2718_transactions(),
714 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
715 block_access_list: None,
716 })
717 .await
718 }
719
720 pub async fn get_payload_bodies_by_hash_v2_metered(
722 &self,
723 hashes: Vec<BlockHash>,
724 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
725 let start = Instant::now();
726 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
727 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
728 res
729 }
730
731 async fn validate_and_execute_forkchoice(
745 &self,
746 version: EngineApiMessageVersion,
747 state: ForkchoiceState,
748 payload_attrs: Option<EngineT::PayloadAttributes>,
749 ) -> EngineApiResult<ForkchoiceUpdated> {
750 if let Some(ref attrs) = payload_attrs {
751 let attr_validation_res =
752 self.inner.validator.ensure_well_formed_attributes(version, attrs);
753
754 if let Err(err) = attr_validation_res {
764 let fcu_res = self.inner.beacon_consensus.fork_choice_updated(state, None).await?;
765 if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
766 return Ok(fcu_res)
767 }
768 return Err(err.into())
769 }
770 }
771
772 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
773 }
774
775 pub fn capabilities(&self) -> &EngineCapabilities {
777 &self.inner.capabilities
778 }
779
780 fn get_blobs_v1(
781 &self,
782 versioned_hashes: Vec<B256>,
783 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
784 let current_timestamp =
786 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
787 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
788 return Err(EngineApiError::EngineObjectValidationError(
789 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
790 ));
791 }
792
793 if versioned_hashes.len() > MAX_BLOB_LIMIT {
794 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
795 }
796
797 self.inner
798 .tx_pool
799 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
800 .map_err(|err| EngineApiError::Internal(Box::new(err)))
801 }
802
803 pub fn get_blobs_v1_metered(
805 &self,
806 versioned_hashes: Vec<B256>,
807 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
808 let hashes_len = versioned_hashes.len();
809 let start = Instant::now();
810 let res = Self::get_blobs_v1(self, versioned_hashes);
811 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
812
813 if let Ok(blobs) = &res {
814 let blobs_found = blobs.iter().flatten().count();
815 let blobs_missed = hashes_len - blobs_found;
816
817 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
818 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
819 }
820
821 res
822 }
823
824 fn get_blobs_v2(
825 &self,
826 versioned_hashes: Vec<B256>,
827 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
828 let current_timestamp =
830 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
831 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
832 return Err(EngineApiError::EngineObjectValidationError(
833 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
834 ));
835 }
836
837 if versioned_hashes.len() > MAX_BLOB_LIMIT {
838 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
839 }
840
841 self.inner
842 .tx_pool
843 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
844 .map_err(|err| EngineApiError::Internal(Box::new(err)))
845 }
846
847 fn get_blobs_v3(
848 &self,
849 versioned_hashes: Vec<B256>,
850 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
851 let current_timestamp =
853 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
854 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
855 return Err(EngineApiError::EngineObjectValidationError(
856 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
857 ));
858 }
859
860 if versioned_hashes.len() > MAX_BLOB_LIMIT {
861 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
862 }
863
864 if (*self.inner.is_syncing)() {
866 return Ok(None)
867 }
868
869 self.inner
870 .tx_pool
871 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
872 .map(Some)
873 .map_err(|err| EngineApiError::Internal(Box::new(err)))
874 }
875
876 pub fn get_blobs_v2_metered(
878 &self,
879 versioned_hashes: Vec<B256>,
880 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
881 let hashes_len = versioned_hashes.len();
882 let start = Instant::now();
883 let res = Self::get_blobs_v2(self, versioned_hashes);
884 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
885
886 if let Ok(blobs) = &res {
887 let blobs_found = blobs.iter().flatten().count();
888
889 self.inner
890 .metrics
891 .blob_metrics
892 .get_blobs_requests_blobs_total
893 .increment(hashes_len as u64);
894 self.inner
895 .metrics
896 .blob_metrics
897 .get_blobs_requests_blobs_in_blobpool_total
898 .increment(blobs_found as u64);
899
900 if blobs_found == hashes_len {
901 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
902 } else {
903 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
904 }
905 } else {
906 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
907 }
908
909 res
910 }
911
912 pub fn get_blobs_v3_metered(
914 &self,
915 versioned_hashes: Vec<B256>,
916 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
917 let hashes_len = versioned_hashes.len();
918 let start = Instant::now();
919 let res = Self::get_blobs_v3(self, versioned_hashes);
920 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
921
922 if let Ok(Some(blobs)) = &res {
923 let blobs_found = blobs.iter().flatten().count();
924 let blobs_missed = hashes_len - blobs_found;
925
926 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
927 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
928 }
929
930 res
931 }
932}
933
934#[async_trait]
936impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
937 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
938where
939 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
940 EngineT: EngineTypes<ExecutionData = ExecutionData>,
941 Pool: TransactionPool + 'static,
942 Validator: EngineApiValidator<EngineT>,
943 ChainSpec: EthereumHardforks + Send + Sync + 'static,
944{
945 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
949 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
950 let payload =
951 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
952 Ok(self.new_payload_v1_metered(payload).await?)
953 }
954
955 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
958 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
959 let payload = ExecutionData {
960 payload: payload.into_payload(),
961 sidecar: ExecutionPayloadSidecar::none(),
962 };
963
964 Ok(self.new_payload_v2_metered(payload).await?)
965 }
966
967 async fn new_payload_v3(
970 &self,
971 payload: ExecutionPayloadV3,
972 versioned_hashes: Vec<B256>,
973 parent_beacon_block_root: B256,
974 ) -> RpcResult<PayloadStatus> {
975 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
976 let payload = ExecutionData {
977 payload: payload.into(),
978 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
979 versioned_hashes,
980 parent_beacon_block_root,
981 }),
982 };
983
984 Ok(self.new_payload_v3_metered(payload).await?)
985 }
986
987 async fn new_payload_v4(
990 &self,
991 payload: ExecutionPayloadV3,
992 versioned_hashes: Vec<B256>,
993 parent_beacon_block_root: B256,
994 requests: RequestsOrHash,
995 ) -> RpcResult<PayloadStatus> {
996 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
997
998 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1000 return Err(EngineApiError::UnexpectedRequestsHash.into());
1001 }
1002
1003 let payload = ExecutionData {
1004 payload: payload.into(),
1005 sidecar: ExecutionPayloadSidecar::v4(
1006 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1007 PraguePayloadFields { requests },
1008 ),
1009 };
1010
1011 Ok(self.new_payload_v4_metered(payload).await?)
1012 }
1013
1014 async fn new_payload_v5(
1020 &self,
1021 _payload: ExecutionPayloadV4,
1022 _versioned_hashes: Vec<B256>,
1023 _parent_beacon_block_root: B256,
1024 _execution_requests: RequestsOrHash,
1025 ) -> RpcResult<PayloadStatus> {
1026 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1027 Err(EngineApiError::EngineObjectValidationError(
1028 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1029 ))?
1030 }
1031
1032 async fn fork_choice_updated_v1(
1037 &self,
1038 fork_choice_state: ForkchoiceState,
1039 payload_attributes: Option<EngineT::PayloadAttributes>,
1040 ) -> RpcResult<ForkchoiceUpdated> {
1041 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1042 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1043 }
1044
1045 async fn fork_choice_updated_v2(
1048 &self,
1049 fork_choice_state: ForkchoiceState,
1050 payload_attributes: Option<EngineT::PayloadAttributes>,
1051 ) -> RpcResult<ForkchoiceUpdated> {
1052 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1053 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1054 }
1055
1056 async fn fork_choice_updated_v3(
1060 &self,
1061 fork_choice_state: ForkchoiceState,
1062 payload_attributes: Option<EngineT::PayloadAttributes>,
1063 ) -> RpcResult<ForkchoiceUpdated> {
1064 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1065 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1066 }
1067
1068 async fn get_payload_v1(
1080 &self,
1081 payload_id: PayloadId,
1082 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1083 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1084 Ok(self.get_payload_v1_metered(payload_id).await?)
1085 }
1086
1087 async fn get_payload_v2(
1097 &self,
1098 payload_id: PayloadId,
1099 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1100 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1101 Ok(self.get_payload_v2_metered(payload_id).await?)
1102 }
1103
1104 async fn get_payload_v3(
1114 &self,
1115 payload_id: PayloadId,
1116 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1117 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1118 Ok(self.get_payload_v3_metered(payload_id).await?)
1119 }
1120
1121 async fn get_payload_v4(
1131 &self,
1132 payload_id: PayloadId,
1133 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1134 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1135 Ok(self.get_payload_v4_metered(payload_id).await?)
1136 }
1137
1138 async fn get_payload_v5(
1148 &self,
1149 payload_id: PayloadId,
1150 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1151 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1152 Ok(self.get_payload_v5_metered(payload_id).await?)
1153 }
1154
1155 async fn get_payload_v6(
1161 &self,
1162 _payload_id: PayloadId,
1163 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1164 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1165 Err(EngineApiError::EngineObjectValidationError(
1166 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1167 ))?
1168 }
1169
1170 async fn get_payload_bodies_by_hash_v1(
1173 &self,
1174 block_hashes: Vec<BlockHash>,
1175 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1176 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1177 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1178 }
1179
1180 async fn get_payload_bodies_by_hash_v2(
1186 &self,
1187 block_hashes: Vec<BlockHash>,
1188 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1189 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1190 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1191 }
1192
1193 async fn get_payload_bodies_by_range_v1(
1210 &self,
1211 start: U64,
1212 count: U64,
1213 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1214 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1215 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1216 }
1217
1218 async fn get_payload_bodies_by_range_v2(
1224 &self,
1225 start: U64,
1226 count: U64,
1227 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1228 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1229 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1230 }
1231
1232 async fn get_client_version_v1(
1236 &self,
1237 client: ClientVersionV1,
1238 ) -> RpcResult<Vec<ClientVersionV1>> {
1239 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1240 Ok(Self::get_client_version_v1(self, client)?)
1241 }
1242
1243 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1246 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1247
1248 let el_caps = self.capabilities();
1249 el_caps.log_capability_mismatches(&capabilities);
1250
1251 Ok(el_caps.list())
1252 }
1253
1254 async fn get_blobs_v1(
1255 &self,
1256 versioned_hashes: Vec<B256>,
1257 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1258 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1259 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1260 }
1261
1262 async fn get_blobs_v2(
1263 &self,
1264 versioned_hashes: Vec<B256>,
1265 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1266 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1267 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1268 }
1269
1270 async fn get_blobs_v3(
1271 &self,
1272 versioned_hashes: Vec<B256>,
1273 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1274 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1275 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1276 }
1277}
1278
1279impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1280 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1281where
1282 EngineT: EngineTypes,
1283 Self: EngineApiServer<EngineT>,
1284{
1285 fn into_rpc_module(self) -> RpcModule<()> {
1286 EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1287 }
1288}
1289
1290impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1291 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1292where
1293 PayloadT: PayloadTypes,
1294{
1295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1296 f.debug_struct("EngineApi").finish_non_exhaustive()
1297 }
1298}
1299
1300impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1301 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1302where
1303 PayloadT: PayloadTypes,
1304{
1305 fn clone(&self) -> Self {
1306 Self { inner: Arc::clone(&self.inner) }
1307 }
1308}
1309
1310struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1312 provider: Provider,
1314 chain_spec: Arc<ChainSpec>,
1316 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1318 payload_store: PayloadStore<PayloadT>,
1320 task_spawner: Runtime,
1322 metrics: EngineApiMetrics,
1324 client: ClientVersionV1,
1326 capabilities: EngineCapabilities,
1328 tx_pool: Pool,
1330 validator: Validator,
1332 accept_execution_requests_hash: bool,
1333 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1335}
1336
1337#[cfg(test)]
1338mod tests {
1339 use super::*;
1340 use alloy_primitives::{Address, B256};
1341 use alloy_rpc_types_engine::{
1342 ClientCode, ClientVersionV1, PayloadAttributes, PayloadStatusEnum,
1343 };
1344 use assert_matches::assert_matches;
1345 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1346 use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1347 use reth_ethereum_engine_primitives::EthEngineTypes;
1348 use reth_ethereum_primitives::Block;
1349 use reth_network_api::{
1350 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1351 };
1352 use reth_node_ethereum::EthereumEngineValidator;
1353 use reth_payload_builder::test_utils::spawn_test_payload_service;
1354 use reth_provider::test_utils::MockEthProvider;
1355 use reth_tasks::Runtime;
1356 use reth_transaction_pool::noop::NoopTransactionPool;
1357 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1358
1359 fn setup_engine_api() -> (
1360 EngineApiTestHandle,
1361 EngineApi<
1362 Arc<MockEthProvider>,
1363 EthEngineTypes,
1364 NoopTransactionPool,
1365 EthereumEngineValidator,
1366 ChainSpec,
1367 >,
1368 ) {
1369 let client = ClientVersionV1 {
1370 code: ClientCode::RH,
1371 name: "Reth".to_string(),
1372 version: "v0.2.0-beta.5".to_string(),
1373 commit: "defa64b2".to_string(),
1374 };
1375
1376 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1377 let provider = Arc::new(MockEthProvider::default());
1378 let payload_store = spawn_test_payload_service();
1379 let (to_engine, engine_rx) = unbounded_channel();
1380 let task_executor = Runtime::test();
1381 let api = EngineApi::new(
1382 provider.clone(),
1383 chain_spec.clone(),
1384 ConsensusEngineHandle::new(to_engine),
1385 payload_store.into(),
1386 NoopTransactionPool::default(),
1387 task_executor,
1388 client,
1389 EngineCapabilities::default(),
1390 EthereumEngineValidator::new(chain_spec.clone()),
1391 false,
1392 NoopNetwork::default(),
1393 );
1394 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1395 (handle, api)
1396 }
1397
1398 #[tokio::test]
1399 async fn engine_client_version_v1() {
1400 let client = ClientVersionV1 {
1401 code: ClientCode::RH,
1402 name: "Reth".to_string(),
1403 version: "v0.2.0-beta.5".to_string(),
1404 commit: "defa64b2".to_string(),
1405 };
1406 let (_, api) = setup_engine_api();
1407 let res = api.get_client_version_v1(client.clone());
1408 assert_eq!(res.unwrap(), vec![client]);
1409 }
1410
1411 struct EngineApiTestHandle {
1412 #[allow(dead_code)]
1413 chain_spec: Arc<ChainSpec>,
1414 provider: Arc<MockEthProvider>,
1415 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1416 }
1417
1418 #[tokio::test]
1419 async fn forwards_responses_to_consensus_engine() {
1420 let (mut handle, api) = setup_engine_api();
1421
1422 tokio::spawn(async move {
1423 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1424 let execution_data = ExecutionData {
1425 payload: payload_v1.into(),
1426 sidecar: ExecutionPayloadSidecar::none(),
1427 };
1428
1429 api.new_payload_v1(execution_data).await.unwrap();
1430 });
1431 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1432 }
1433
1434 #[derive(Clone)]
1435 struct TestNetworkInfo {
1436 syncing: bool,
1437 }
1438
1439 impl NetworkInfo for TestNetworkInfo {
1440 fn local_addr(&self) -> std::net::SocketAddr {
1441 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1442 }
1443
1444 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1445 #[allow(deprecated)]
1446 Ok(NetworkStatus {
1447 client_version: "test".to_string(),
1448 protocol_version: 5,
1449 eth_protocol_info: EthProtocolInfo {
1450 network: 1,
1451 difficulty: None,
1452 genesis: Default::default(),
1453 config: Default::default(),
1454 head: Default::default(),
1455 },
1456 capabilities: vec![],
1457 })
1458 }
1459
1460 fn chain_id(&self) -> u64 {
1461 1
1462 }
1463
1464 fn is_syncing(&self) -> bool {
1465 self.syncing
1466 }
1467
1468 fn is_initially_syncing(&self) -> bool {
1469 self.syncing
1470 }
1471 }
1472
1473 #[tokio::test]
1474 async fn get_blobs_v3_returns_null_when_syncing() {
1475 let chain_spec: Arc<ChainSpec> =
1476 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1477 let provider = Arc::new(MockEthProvider::default());
1478 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1479 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1480
1481 let api = EngineApi::new(
1482 provider,
1483 chain_spec.clone(),
1484 ConsensusEngineHandle::new(to_engine),
1485 payload_store.into(),
1486 NoopTransactionPool::default(),
1487 Runtime::test(),
1488 ClientVersionV1 {
1489 code: ClientCode::RH,
1490 name: "Reth".to_string(),
1491 version: "v0.0.0-test".to_string(),
1492 commit: "test".to_string(),
1493 },
1494 EngineCapabilities::default(),
1495 EthereumEngineValidator::new(chain_spec),
1496 false,
1497 TestNetworkInfo { syncing: true },
1498 );
1499
1500 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1501 assert_matches!(res, Ok(None));
1502 }
1503
1504 #[tokio::test]
1505 async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
1506 let (mut handle, api) = setup_engine_api();
1507
1508 let state = ForkchoiceState {
1509 head_block_hash: B256::from([0x11; 32]),
1510 safe_block_hash: B256::ZERO,
1511 finalized_block_hash: B256::ZERO,
1512 };
1513 let payload_attributes = PayloadAttributes {
1514 timestamp: 1,
1515 prev_randao: B256::ZERO,
1516 suggested_fee_recipient: Address::ZERO,
1517 withdrawals: Some(vec![]),
1518 parent_beacon_block_root: None,
1520 };
1521
1522 let api_task = tokio::spawn(async move {
1523 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1524 });
1525
1526 let request =
1527 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1528 .await
1529 .expect("timed out waiting for forkchoiceUpdated request")
1530 .expect("expected forkchoiceUpdated request");
1531 let response_tx = match request {
1532 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1533 assert!(
1534 payload_attrs.is_none(),
1535 "FCU for syncing state should be evaluated before payload attributes"
1536 );
1537 tx
1538 }
1539 other => panic!("unexpected engine message: {other:?}"),
1540 };
1541
1542 response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
1543
1544 let response = api_task
1545 .await
1546 .expect("api task should not panic")
1547 .expect("forkchoiceUpdatedV3 should return a syncing response");
1548 assert!(response.payload_status.is_syncing());
1549 assert!(response.payload_id.is_none());
1550 }
1551
1552 #[tokio::test]
1553 async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
1554 let (mut handle, api) = setup_engine_api();
1555
1556 let state = ForkchoiceState {
1557 head_block_hash: B256::from([0x22; 32]),
1558 safe_block_hash: B256::ZERO,
1559 finalized_block_hash: B256::ZERO,
1560 };
1561 let payload_attributes = PayloadAttributes {
1562 timestamp: 1,
1563 prev_randao: B256::ZERO,
1564 suggested_fee_recipient: Address::ZERO,
1565 withdrawals: Some(vec![]),
1566 parent_beacon_block_root: None,
1567 };
1568
1569 let api_task = tokio::spawn(async move {
1570 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1571 });
1572
1573 let request =
1574 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1575 .await
1576 .expect("timed out waiting for forkchoiceUpdated request")
1577 .expect("expected forkchoiceUpdated request");
1578
1579 let response_tx = match request {
1580 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1581 assert!(
1582 payload_attrs.is_none(),
1583 "when attrs are invalid, API should first evaluate forkchoice without attrs"
1584 );
1585 tx
1586 }
1587 other => panic!("unexpected engine message: {other:?}"),
1588 };
1589
1590 response_tx
1591 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1592 PayloadStatusEnum::Valid,
1593 ))))
1594 .expect("send valid response");
1595
1596 let response = api_task.await.expect("api task should not panic");
1597 assert_matches!(
1598 response,
1599 Err(EngineApiError::EngineObjectValidationError(
1600 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
1601 ))
1602 );
1603
1604 match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
1605 .await
1606 {
1607 Err(_) | Ok(None) => {}
1608 Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
1609 panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
1610 }
1611 Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
1612 }
1613 }
1614
1615 mod get_payload_bodies {
1617 use super::*;
1618 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1619 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1620
1621 #[tokio::test]
1622 async fn invalid_params() {
1623 let (_, api) = setup_engine_api();
1624
1625 let by_range_tests = [
1626 (0, 0),
1628 (0, 1),
1629 (1, 0),
1630 ];
1631
1632 for (start, count) in by_range_tests {
1634 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1635 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1636 }
1637 }
1638
1639 #[tokio::test]
1640 async fn request_too_large() {
1641 let (_, api) = setup_engine_api();
1642
1643 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1644 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1645 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1646 }
1647
1648 #[tokio::test]
1649 async fn returns_payload_bodies() {
1650 let mut rng = generators::rng();
1651 let (handle, api) = setup_engine_api();
1652
1653 let (start, count) = (1, 10);
1654 let blocks = random_block_range(
1655 &mut rng,
1656 start..=start + count - 1,
1657 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1658 );
1659 handle
1660 .provider
1661 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1662
1663 let expected = blocks
1664 .iter()
1665 .cloned()
1666 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1667 .collect::<Vec<_>>();
1668
1669 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1670 assert_eq!(res, expected);
1671 }
1672
1673 #[tokio::test]
1674 async fn returns_payload_bodies_with_gaps() {
1675 let mut rng = generators::rng();
1676 let (handle, api) = setup_engine_api();
1677
1678 let (start, count) = (1, 100);
1679 let blocks = random_block_range(
1680 &mut rng,
1681 start..=start + count - 1,
1682 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1683 );
1684
1685 let first_missing_range = 26..=50;
1687 let second_missing_range = 76..=100;
1688 handle.provider.extend_blocks(
1689 blocks
1690 .iter()
1691 .filter(|b| {
1692 !first_missing_range.contains(&b.number) &&
1693 !second_missing_range.contains(&b.number)
1694 })
1695 .map(|b| (b.hash(), b.clone().into_block())),
1696 );
1697
1698 let expected = blocks
1699 .iter()
1700 .filter(|b| !second_missing_range.contains(&b.number))
1703 .cloned()
1704 .map(|b| {
1705 if first_missing_range.contains(&b.number) {
1706 None
1707 } else {
1708 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1709 }
1710 })
1711 .collect::<Vec<_>>();
1712
1713 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1714 assert_eq!(res, expected);
1715
1716 let expected = blocks
1717 .iter()
1718 .cloned()
1719 .map(|b| {
1722 if first_missing_range.contains(&b.number) ||
1723 second_missing_range.contains(&b.number)
1724 {
1725 None
1726 } else {
1727 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1728 }
1729 })
1730 .collect::<Vec<_>>();
1731
1732 let hashes = blocks.iter().map(|b| b.hash()).collect();
1733 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1734 assert_eq!(res, expected);
1735 }
1736 }
1737}