1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::NetworkInfo;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
81 PayloadT: PayloadTypes,
82 Pool: TransactionPool + 'static,
83 Validator: EngineApiValidator<PayloadT>,
84 ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86 #[expect(clippy::too_many_arguments)]
88 pub fn new(
89 provider: Provider,
90 chain_spec: Arc<ChainSpec>,
91 beacon_consensus: ConsensusEngineHandle<PayloadT>,
92 payload_store: PayloadStore<PayloadT>,
93 tx_pool: Pool,
94 task_spawner: Runtime,
95 client: ClientVersionV1,
96 capabilities: EngineCapabilities,
97 validator: Validator,
98 accept_execution_requests_hash: bool,
99 network: impl NetworkInfo + 'static,
100 ) -> Self {
101 let is_syncing = Arc::new(move || network.is_syncing());
102 let inner = Arc::new(EngineApiInner {
103 provider,
104 chain_spec,
105 beacon_consensus,
106 payload_store,
107 task_spawner,
108 metrics: EngineApiMetrics::default(),
109 client,
110 capabilities,
111 tx_pool,
112 validator,
113 accept_execution_requests_hash,
114 is_syncing,
115 });
116 Self { inner }
117 }
118
119 pub fn get_client_version_v1(
121 &self,
122 _client: ClientVersionV1,
123 ) -> EngineApiResult<Vec<ClientVersionV1>> {
124 Ok(vec![self.inner.client.clone()])
125 }
126
127 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
129 Ok(self
130 .inner
131 .payload_store
132 .payload_timestamp(payload_id)
133 .await
134 .ok_or(EngineApiError::UnknownPayload)??)
135 }
136
137 pub async fn new_payload_v1(
140 &self,
141 payload: PayloadT::ExecutionData,
142 ) -> EngineApiResult<PayloadStatus> {
143 let payload_or_attrs = PayloadOrAttributes::<
144 '_,
145 PayloadT::ExecutionData,
146 PayloadT::PayloadAttributes,
147 >::from_execution_payload(&payload);
148
149 self.inner
150 .validator
151 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
154 }
155
156 pub async fn new_payload_v1_metered(
158 &self,
159 payload: PayloadT::ExecutionData,
160 ) -> EngineApiResult<PayloadStatus> {
161 let start = Instant::now();
162 let res = Self::new_payload_v1(self, payload).await;
163 let elapsed = start.elapsed();
164 self.inner.metrics.latency.new_payload_v1.record(elapsed);
165 res
166 }
167
168 pub async fn new_payload_v2(
170 &self,
171 payload: PayloadT::ExecutionData,
172 ) -> EngineApiResult<PayloadStatus> {
173 let payload_or_attrs = PayloadOrAttributes::<
174 '_,
175 PayloadT::ExecutionData,
176 PayloadT::PayloadAttributes,
177 >::from_execution_payload(&payload);
178 self.inner
179 .validator
180 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
181 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
182 }
183
184 pub async fn new_payload_v2_metered(
186 &self,
187 payload: PayloadT::ExecutionData,
188 ) -> EngineApiResult<PayloadStatus> {
189 let start = Instant::now();
190 let res = Self::new_payload_v2(self, payload).await;
191 let elapsed = start.elapsed();
192 self.inner.metrics.latency.new_payload_v2.record(elapsed);
193 res
194 }
195
196 pub async fn new_payload_v3(
198 &self,
199 payload: PayloadT::ExecutionData,
200 ) -> EngineApiResult<PayloadStatus> {
201 let payload_or_attrs = PayloadOrAttributes::<
202 '_,
203 PayloadT::ExecutionData,
204 PayloadT::PayloadAttributes,
205 >::from_execution_payload(&payload);
206 self.inner
207 .validator
208 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
209
210 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
211 }
212
213 pub async fn new_payload_v3_metered(
215 &self,
216 payload: PayloadT::ExecutionData,
217 ) -> RpcResult<PayloadStatus> {
218 let start = Instant::now();
219
220 let res = Self::new_payload_v3(self, payload).await;
221 let elapsed = start.elapsed();
222 self.inner.metrics.latency.new_payload_v3.record(elapsed);
223 Ok(res?)
224 }
225
226 pub async fn new_payload_v4(
228 &self,
229 payload: PayloadT::ExecutionData,
230 ) -> EngineApiResult<PayloadStatus> {
231 let payload_or_attrs = PayloadOrAttributes::<
232 '_,
233 PayloadT::ExecutionData,
234 PayloadT::PayloadAttributes,
235 >::from_execution_payload(&payload);
236 self.inner
237 .validator
238 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
239
240 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
241 }
242
243 pub async fn new_payload_v4_metered(
245 &self,
246 payload: PayloadT::ExecutionData,
247 ) -> RpcResult<PayloadStatus> {
248 let start = Instant::now();
249 let res = Self::new_payload_v4(self, payload).await;
250
251 let elapsed = start.elapsed();
252 self.inner.metrics.latency.new_payload_v4.record(elapsed);
253 Ok(res?)
254 }
255
256 pub fn accept_execution_requests_hash(&self) -> bool {
258 self.inner.accept_execution_requests_hash
259 }
260}
261
262impl<Provider, EngineT, Pool, Validator, ChainSpec>
263 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
264where
265 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
266 EngineT: EngineTypes,
267 Pool: TransactionPool + 'static,
268 Validator: EngineApiValidator<EngineT>,
269 ChainSpec: EthereumHardforks + Send + Sync + 'static,
270{
271 pub async fn fork_choice_updated_v1(
278 &self,
279 state: ForkchoiceState,
280 payload_attrs: Option<EngineT::PayloadAttributes>,
281 ) -> EngineApiResult<ForkchoiceUpdated> {
282 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
283 .await
284 }
285
286 pub async fn fork_choice_updated_v1_metered(
288 &self,
289 state: ForkchoiceState,
290 payload_attrs: Option<EngineT::PayloadAttributes>,
291 ) -> EngineApiResult<ForkchoiceUpdated> {
292 let start = Instant::now();
293 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
294 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
295 res
296 }
297
298 pub async fn fork_choice_updated_v2(
303 &self,
304 state: ForkchoiceState,
305 payload_attrs: Option<EngineT::PayloadAttributes>,
306 ) -> EngineApiResult<ForkchoiceUpdated> {
307 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
308 .await
309 }
310
311 pub async fn fork_choice_updated_v2_metered(
313 &self,
314 state: ForkchoiceState,
315 payload_attrs: Option<EngineT::PayloadAttributes>,
316 ) -> EngineApiResult<ForkchoiceUpdated> {
317 let start = Instant::now();
318 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
319 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
320 res
321 }
322
323 pub async fn fork_choice_updated_v3(
328 &self,
329 state: ForkchoiceState,
330 payload_attrs: Option<EngineT::PayloadAttributes>,
331 ) -> EngineApiResult<ForkchoiceUpdated> {
332 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
333 .await
334 }
335
336 pub async fn fork_choice_updated_v3_metered(
338 &self,
339 state: ForkchoiceState,
340 payload_attrs: Option<EngineT::PayloadAttributes>,
341 ) -> EngineApiResult<ForkchoiceUpdated> {
342 let start = Instant::now();
343 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
344 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
345 res
346 }
347
348 async fn get_built_payload(
350 &self,
351 payload_id: PayloadId,
352 ) -> EngineApiResult<EngineT::BuiltPayload> {
353 self.inner
354 .payload_store
355 .resolve(payload_id)
356 .await
357 .ok_or(EngineApiError::UnknownPayload)?
358 .map_err(|_| EngineApiError::UnknownPayload)
359 }
360
361 async fn get_payload_inner<R>(
364 &self,
365 payload_id: PayloadId,
366 version: EngineApiMessageVersion,
367 ) -> EngineApiResult<R>
368 where
369 EngineT::BuiltPayload: TryInto<R>,
370 {
371 let timestamp = self.get_payload_timestamp(payload_id).await?;
374 validate_payload_timestamp(
375 &self.inner.chain_spec,
376 version,
377 timestamp,
378 MessageValidationKind::GetPayload,
379 )?;
380
381 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
383 warn!(?version, "could not transform built payload");
384 EngineApiError::UnknownPayload
385 })
386 }
387
388 pub async fn get_payload_v1(
398 &self,
399 payload_id: PayloadId,
400 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
401 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
402 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
403 EngineApiError::UnknownPayload
404 })
405 }
406
407 pub async fn get_payload_v1_metered(
409 &self,
410 payload_id: PayloadId,
411 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
412 let start = Instant::now();
413 let res = Self::get_payload_v1(self, payload_id).await;
414 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
415 res
416 }
417
418 pub async fn get_payload_v2(
426 &self,
427 payload_id: PayloadId,
428 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
429 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
430 }
431
432 pub async fn get_payload_v2_metered(
434 &self,
435 payload_id: PayloadId,
436 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
437 let start = Instant::now();
438 let res = Self::get_payload_v2(self, payload_id).await;
439 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
440 res
441 }
442
443 pub async fn get_payload_v3(
451 &self,
452 payload_id: PayloadId,
453 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
454 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
455 }
456
457 pub async fn get_payload_v3_metered(
459 &self,
460 payload_id: PayloadId,
461 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
462 let start = Instant::now();
463 let res = Self::get_payload_v3(self, payload_id).await;
464 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
465 res
466 }
467
468 pub async fn get_payload_v4(
476 &self,
477 payload_id: PayloadId,
478 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
479 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
480 }
481
482 pub async fn get_payload_v4_metered(
484 &self,
485 payload_id: PayloadId,
486 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
487 let start = Instant::now();
488 let res = Self::get_payload_v4(self, payload_id).await;
489 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
490 res
491 }
492
493 pub async fn get_payload_v5(
503 &self,
504 payload_id: PayloadId,
505 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
506 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
507 }
508
509 pub async fn get_payload_v5_metered(
511 &self,
512 payload_id: PayloadId,
513 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
514 let start = Instant::now();
515 let res = Self::get_payload_v5(self, payload_id).await;
516 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
517 res
518 }
519
520 pub async fn get_payload_bodies_by_range_with<F, R>(
523 &self,
524 start: BlockNumber,
525 count: u64,
526 f: F,
527 ) -> EngineApiResult<Vec<Option<R>>>
528 where
529 F: Fn(Provider::Block) -> R + Send + 'static,
530 R: Send + 'static,
531 {
532 let (tx, rx) = oneshot::channel();
533 let inner = self.inner.clone();
534
535 self.inner.task_spawner.spawn_blocking_task(async move {
536 if count > MAX_PAYLOAD_BODIES_LIMIT {
537 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
538 return;
539 }
540
541 if start == 0 || count == 0 {
542 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
543 return;
544 }
545
546 let mut result = Vec::with_capacity(count as usize);
547
548 let mut end = start.saturating_add(count - 1);
550
551 if let Ok(best_block) = inner.provider.best_block_number()
554 && end > best_block {
555 end = best_block;
556 }
557
558 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
560 for num in start..=end {
561 if num < earliest_block {
562 result.push(None);
563 continue;
564 }
565 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
566 match block_result {
567 Ok(block) => {
568 result.push(block.map(&f));
569 }
570 Err(err) => {
571 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
572 return;
573 }
574 };
575 }
576 tx.send(Ok(result)).ok();
577 });
578
579 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
580 }
581
582 pub async fn get_payload_bodies_by_range_v1(
593 &self,
594 start: BlockNumber,
595 count: u64,
596 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
597 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
598 transactions: block.body().encoded_2718_transactions(),
599 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
600 })
601 .await
602 }
603
604 pub async fn get_payload_bodies_by_range_v1_metered(
606 &self,
607 start: BlockNumber,
608 count: u64,
609 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
610 let start_time = Instant::now();
611 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
612 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
613 res
614 }
615
616 pub async fn get_payload_bodies_by_range_v2(
620 &self,
621 start: BlockNumber,
622 count: u64,
623 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
624 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 {
625 transactions: block.body().encoded_2718_transactions(),
626 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
627 block_access_list: None,
628 })
629 .await
630 }
631
632 pub async fn get_payload_bodies_by_range_v2_metered(
634 &self,
635 start: BlockNumber,
636 count: u64,
637 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
638 let start_time = Instant::now();
639 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
640 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
641 res
642 }
643
644 pub async fn get_payload_bodies_by_hash_with<F, R>(
646 &self,
647 hashes: Vec<BlockHash>,
648 f: F,
649 ) -> EngineApiResult<Vec<Option<R>>>
650 where
651 F: Fn(Provider::Block) -> R + Send + 'static,
652 R: Send + 'static,
653 {
654 let len = hashes.len() as u64;
655 if len > MAX_PAYLOAD_BODIES_LIMIT {
656 return Err(EngineApiError::PayloadRequestTooLarge { len });
657 }
658
659 let (tx, rx) = oneshot::channel();
660 let inner = self.inner.clone();
661
662 self.inner.task_spawner.spawn_blocking_task(async move {
663 let mut result = Vec::with_capacity(hashes.len());
664 for hash in hashes {
665 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
666 match block_result {
667 Ok(block) => {
668 result.push(block.map(&f));
669 }
670 Err(err) => {
671 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
672 return;
673 }
674 }
675 }
676 tx.send(Ok(result)).ok();
677 });
678
679 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
680 }
681
682 pub async fn get_payload_bodies_by_hash_v1(
684 &self,
685 hashes: Vec<BlockHash>,
686 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
687 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
688 transactions: block.body().encoded_2718_transactions(),
689 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
690 })
691 .await
692 }
693
694 pub async fn get_payload_bodies_by_hash_v1_metered(
696 &self,
697 hashes: Vec<BlockHash>,
698 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
699 let start = Instant::now();
700 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
701 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
702 res
703 }
704
705 pub async fn get_payload_bodies_by_hash_v2(
709 &self,
710 hashes: Vec<BlockHash>,
711 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
712 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV2 {
713 transactions: block.body().encoded_2718_transactions(),
714 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
715 block_access_list: None,
716 })
717 .await
718 }
719
720 pub async fn get_payload_bodies_by_hash_v2_metered(
722 &self,
723 hashes: Vec<BlockHash>,
724 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
725 let start = Instant::now();
726 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
727 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
728 res
729 }
730
731 async fn validate_and_execute_forkchoice(
745 &self,
746 version: EngineApiMessageVersion,
747 state: ForkchoiceState,
748 payload_attrs: Option<EngineT::PayloadAttributes>,
749 ) -> EngineApiResult<ForkchoiceUpdated> {
750 if let Some(ref attrs) = payload_attrs {
751 let attr_validation_res =
752 self.inner.validator.ensure_well_formed_attributes(version, attrs);
753
754 if let Err(err) = attr_validation_res {
764 let fcu_res =
765 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
766 if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
767 return Ok(fcu_res)
768 }
769 return Err(err.into())
770 }
771 }
772
773 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
774 }
775
776 pub fn capabilities(&self) -> &EngineCapabilities {
778 &self.inner.capabilities
779 }
780
781 fn get_blobs_v1(
782 &self,
783 versioned_hashes: Vec<B256>,
784 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
785 let current_timestamp =
787 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
788 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
789 return Err(EngineApiError::EngineObjectValidationError(
790 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
791 ));
792 }
793
794 if versioned_hashes.len() > MAX_BLOB_LIMIT {
795 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
796 }
797
798 self.inner
799 .tx_pool
800 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
801 .map_err(|err| EngineApiError::Internal(Box::new(err)))
802 }
803
804 pub fn get_blobs_v1_metered(
806 &self,
807 versioned_hashes: Vec<B256>,
808 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
809 let hashes_len = versioned_hashes.len();
810 let start = Instant::now();
811 let res = Self::get_blobs_v1(self, versioned_hashes);
812 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
813
814 if let Ok(blobs) = &res {
815 let blobs_found = blobs.iter().flatten().count();
816 let blobs_missed = hashes_len - blobs_found;
817
818 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
819 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
820 }
821
822 res
823 }
824
825 fn get_blobs_v2(
826 &self,
827 versioned_hashes: Vec<B256>,
828 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
829 let current_timestamp =
831 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
832 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
833 return Err(EngineApiError::EngineObjectValidationError(
834 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
835 ));
836 }
837
838 if versioned_hashes.len() > MAX_BLOB_LIMIT {
839 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
840 }
841
842 self.inner
843 .tx_pool
844 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
845 .map_err(|err| EngineApiError::Internal(Box::new(err)))
846 }
847
848 fn get_blobs_v3(
849 &self,
850 versioned_hashes: Vec<B256>,
851 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
852 let current_timestamp =
854 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
855 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
856 return Err(EngineApiError::EngineObjectValidationError(
857 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
858 ));
859 }
860
861 if versioned_hashes.len() > MAX_BLOB_LIMIT {
862 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
863 }
864
865 if (*self.inner.is_syncing)() {
867 return Ok(None)
868 }
869
870 self.inner
871 .tx_pool
872 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
873 .map(Some)
874 .map_err(|err| EngineApiError::Internal(Box::new(err)))
875 }
876
877 pub fn get_blobs_v2_metered(
879 &self,
880 versioned_hashes: Vec<B256>,
881 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
882 let hashes_len = versioned_hashes.len();
883 let start = Instant::now();
884 let res = Self::get_blobs_v2(self, versioned_hashes);
885 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
886
887 if let Ok(blobs) = &res {
888 let blobs_found = blobs.iter().flatten().count();
889
890 self.inner
891 .metrics
892 .blob_metrics
893 .get_blobs_requests_blobs_total
894 .increment(hashes_len as u64);
895 self.inner
896 .metrics
897 .blob_metrics
898 .get_blobs_requests_blobs_in_blobpool_total
899 .increment(blobs_found as u64);
900
901 if blobs_found == hashes_len {
902 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
903 } else {
904 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
905 }
906 } else {
907 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
908 }
909
910 res
911 }
912
913 pub fn get_blobs_v3_metered(
915 &self,
916 versioned_hashes: Vec<B256>,
917 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
918 let hashes_len = versioned_hashes.len();
919 let start = Instant::now();
920 let res = Self::get_blobs_v3(self, versioned_hashes);
921 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
922
923 if let Ok(Some(blobs)) = &res {
924 let blobs_found = blobs.iter().flatten().count();
925 let blobs_missed = hashes_len - blobs_found;
926
927 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
928 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
929 }
930
931 res
932 }
933}
934
935#[async_trait]
937impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
938 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
939where
940 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
941 EngineT: EngineTypes<ExecutionData = ExecutionData>,
942 Pool: TransactionPool + 'static,
943 Validator: EngineApiValidator<EngineT>,
944 ChainSpec: EthereumHardforks + Send + Sync + 'static,
945{
946 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
950 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
951 let payload =
952 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
953 Ok(self.new_payload_v1_metered(payload).await?)
954 }
955
956 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
959 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
960 let payload = ExecutionData {
961 payload: payload.into_payload(),
962 sidecar: ExecutionPayloadSidecar::none(),
963 };
964
965 Ok(self.new_payload_v2_metered(payload).await?)
966 }
967
968 async fn new_payload_v3(
971 &self,
972 payload: ExecutionPayloadV3,
973 versioned_hashes: Vec<B256>,
974 parent_beacon_block_root: B256,
975 ) -> RpcResult<PayloadStatus> {
976 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
977 let payload = ExecutionData {
978 payload: payload.into(),
979 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
980 versioned_hashes,
981 parent_beacon_block_root,
982 }),
983 };
984
985 Ok(self.new_payload_v3_metered(payload).await?)
986 }
987
988 async fn new_payload_v4(
991 &self,
992 payload: ExecutionPayloadV3,
993 versioned_hashes: Vec<B256>,
994 parent_beacon_block_root: B256,
995 requests: RequestsOrHash,
996 ) -> RpcResult<PayloadStatus> {
997 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
998
999 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1001 return Err(EngineApiError::UnexpectedRequestsHash.into());
1002 }
1003
1004 let payload = ExecutionData {
1005 payload: payload.into(),
1006 sidecar: ExecutionPayloadSidecar::v4(
1007 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1008 PraguePayloadFields { requests },
1009 ),
1010 };
1011
1012 Ok(self.new_payload_v4_metered(payload).await?)
1013 }
1014
1015 async fn new_payload_v5(
1021 &self,
1022 _payload: ExecutionPayloadV4,
1023 _versioned_hashes: Vec<B256>,
1024 _parent_beacon_block_root: B256,
1025 _execution_requests: RequestsOrHash,
1026 ) -> RpcResult<PayloadStatus> {
1027 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1028 Err(EngineApiError::EngineObjectValidationError(
1029 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1030 ))?
1031 }
1032
1033 async fn fork_choice_updated_v1(
1038 &self,
1039 fork_choice_state: ForkchoiceState,
1040 payload_attributes: Option<EngineT::PayloadAttributes>,
1041 ) -> RpcResult<ForkchoiceUpdated> {
1042 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1043 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1044 }
1045
1046 async fn fork_choice_updated_v2(
1049 &self,
1050 fork_choice_state: ForkchoiceState,
1051 payload_attributes: Option<EngineT::PayloadAttributes>,
1052 ) -> RpcResult<ForkchoiceUpdated> {
1053 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1054 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1055 }
1056
1057 async fn fork_choice_updated_v3(
1061 &self,
1062 fork_choice_state: ForkchoiceState,
1063 payload_attributes: Option<EngineT::PayloadAttributes>,
1064 ) -> RpcResult<ForkchoiceUpdated> {
1065 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1066 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1067 }
1068
1069 async fn get_payload_v1(
1081 &self,
1082 payload_id: PayloadId,
1083 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1084 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1085 Ok(self.get_payload_v1_metered(payload_id).await?)
1086 }
1087
1088 async fn get_payload_v2(
1098 &self,
1099 payload_id: PayloadId,
1100 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1101 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1102 Ok(self.get_payload_v2_metered(payload_id).await?)
1103 }
1104
1105 async fn get_payload_v3(
1115 &self,
1116 payload_id: PayloadId,
1117 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1118 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1119 Ok(self.get_payload_v3_metered(payload_id).await?)
1120 }
1121
1122 async fn get_payload_v4(
1132 &self,
1133 payload_id: PayloadId,
1134 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1135 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1136 Ok(self.get_payload_v4_metered(payload_id).await?)
1137 }
1138
1139 async fn get_payload_v5(
1149 &self,
1150 payload_id: PayloadId,
1151 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1152 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1153 Ok(self.get_payload_v5_metered(payload_id).await?)
1154 }
1155
1156 async fn get_payload_v6(
1162 &self,
1163 _payload_id: PayloadId,
1164 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1165 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1166 Err(EngineApiError::EngineObjectValidationError(
1167 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1168 ))?
1169 }
1170
1171 async fn get_payload_bodies_by_hash_v1(
1174 &self,
1175 block_hashes: Vec<BlockHash>,
1176 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1177 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1178 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1179 }
1180
1181 async fn get_payload_bodies_by_hash_v2(
1187 &self,
1188 block_hashes: Vec<BlockHash>,
1189 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1190 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1191 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1192 }
1193
1194 async fn get_payload_bodies_by_range_v1(
1211 &self,
1212 start: U64,
1213 count: U64,
1214 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1215 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1216 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1217 }
1218
1219 async fn get_payload_bodies_by_range_v2(
1225 &self,
1226 start: U64,
1227 count: U64,
1228 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1229 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1230 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1231 }
1232
1233 async fn get_client_version_v1(
1237 &self,
1238 client: ClientVersionV1,
1239 ) -> RpcResult<Vec<ClientVersionV1>> {
1240 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1241 Ok(Self::get_client_version_v1(self, client)?)
1242 }
1243
1244 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1247 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1248
1249 let el_caps = self.capabilities();
1250 el_caps.log_capability_mismatches(&capabilities);
1251
1252 Ok(el_caps.list())
1253 }
1254
1255 async fn get_blobs_v1(
1256 &self,
1257 versioned_hashes: Vec<B256>,
1258 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1259 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1260 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1261 }
1262
1263 async fn get_blobs_v2(
1264 &self,
1265 versioned_hashes: Vec<B256>,
1266 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1267 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1268 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1269 }
1270
1271 async fn get_blobs_v3(
1272 &self,
1273 versioned_hashes: Vec<B256>,
1274 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1275 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1276 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1277 }
1278}
1279
1280impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1281 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1282where
1283 EngineT: EngineTypes,
1284 Self: EngineApiServer<EngineT>,
1285{
1286 fn into_rpc_module(self) -> RpcModule<()> {
1287 EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1288 }
1289}
1290
1291impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1292 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1293where
1294 PayloadT: PayloadTypes,
1295{
1296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1297 f.debug_struct("EngineApi").finish_non_exhaustive()
1298 }
1299}
1300
1301impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1302 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1303where
1304 PayloadT: PayloadTypes,
1305{
1306 fn clone(&self) -> Self {
1307 Self { inner: Arc::clone(&self.inner) }
1308 }
1309}
1310
1311struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1313 provider: Provider,
1315 chain_spec: Arc<ChainSpec>,
1317 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1319 payload_store: PayloadStore<PayloadT>,
1321 task_spawner: Runtime,
1323 metrics: EngineApiMetrics,
1325 client: ClientVersionV1,
1327 capabilities: EngineCapabilities,
1329 tx_pool: Pool,
1331 validator: Validator,
1333 accept_execution_requests_hash: bool,
1334 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340 use super::*;
1341 use alloy_primitives::{Address, B256};
1342 use alloy_rpc_types_engine::{
1343 ClientCode, ClientVersionV1, PayloadAttributes, PayloadStatusEnum,
1344 };
1345 use assert_matches::assert_matches;
1346 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1347 use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1348 use reth_ethereum_engine_primitives::EthEngineTypes;
1349 use reth_ethereum_primitives::Block;
1350 use reth_network_api::{
1351 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1352 };
1353 use reth_node_ethereum::EthereumEngineValidator;
1354 use reth_payload_builder::test_utils::spawn_test_payload_service;
1355 use reth_provider::test_utils::MockEthProvider;
1356 use reth_tasks::Runtime;
1357 use reth_transaction_pool::noop::NoopTransactionPool;
1358 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1359
1360 fn setup_engine_api() -> (
1361 EngineApiTestHandle,
1362 EngineApi<
1363 Arc<MockEthProvider>,
1364 EthEngineTypes,
1365 NoopTransactionPool,
1366 EthereumEngineValidator,
1367 ChainSpec,
1368 >,
1369 ) {
1370 let client = ClientVersionV1 {
1371 code: ClientCode::RH,
1372 name: "Reth".to_string(),
1373 version: "v0.2.0-beta.5".to_string(),
1374 commit: "defa64b2".to_string(),
1375 };
1376
1377 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1378 let provider = Arc::new(MockEthProvider::default());
1379 let payload_store = spawn_test_payload_service();
1380 let (to_engine, engine_rx) = unbounded_channel();
1381 let task_executor = Runtime::test();
1382 let api = EngineApi::new(
1383 provider.clone(),
1384 chain_spec.clone(),
1385 ConsensusEngineHandle::new(to_engine),
1386 payload_store.into(),
1387 NoopTransactionPool::default(),
1388 task_executor,
1389 client,
1390 EngineCapabilities::default(),
1391 EthereumEngineValidator::new(chain_spec.clone()),
1392 false,
1393 NoopNetwork::default(),
1394 );
1395 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1396 (handle, api)
1397 }
1398
1399 #[tokio::test]
1400 async fn engine_client_version_v1() {
1401 let client = ClientVersionV1 {
1402 code: ClientCode::RH,
1403 name: "Reth".to_string(),
1404 version: "v0.2.0-beta.5".to_string(),
1405 commit: "defa64b2".to_string(),
1406 };
1407 let (_, api) = setup_engine_api();
1408 let res = api.get_client_version_v1(client.clone());
1409 assert_eq!(res.unwrap(), vec![client]);
1410 }
1411
1412 struct EngineApiTestHandle {
1413 #[allow(dead_code)]
1414 chain_spec: Arc<ChainSpec>,
1415 provider: Arc<MockEthProvider>,
1416 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1417 }
1418
1419 #[tokio::test]
1420 async fn forwards_responses_to_consensus_engine() {
1421 let (mut handle, api) = setup_engine_api();
1422
1423 tokio::spawn(async move {
1424 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1425 let execution_data = ExecutionData {
1426 payload: payload_v1.into(),
1427 sidecar: ExecutionPayloadSidecar::none(),
1428 };
1429
1430 api.new_payload_v1(execution_data).await.unwrap();
1431 });
1432 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1433 }
1434
1435 #[derive(Clone)]
1436 struct TestNetworkInfo {
1437 syncing: bool,
1438 }
1439
1440 impl NetworkInfo for TestNetworkInfo {
1441 fn local_addr(&self) -> std::net::SocketAddr {
1442 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1443 }
1444
1445 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1446 #[allow(deprecated)]
1447 Ok(NetworkStatus {
1448 client_version: "test".to_string(),
1449 protocol_version: 5,
1450 eth_protocol_info: EthProtocolInfo {
1451 network: 1,
1452 difficulty: None,
1453 genesis: Default::default(),
1454 config: Default::default(),
1455 head: Default::default(),
1456 },
1457 capabilities: vec![],
1458 })
1459 }
1460
1461 fn chain_id(&self) -> u64 {
1462 1
1463 }
1464
1465 fn is_syncing(&self) -> bool {
1466 self.syncing
1467 }
1468
1469 fn is_initially_syncing(&self) -> bool {
1470 self.syncing
1471 }
1472 }
1473
1474 #[tokio::test]
1475 async fn get_blobs_v3_returns_null_when_syncing() {
1476 let chain_spec: Arc<ChainSpec> =
1477 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1478 let provider = Arc::new(MockEthProvider::default());
1479 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1480 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1481
1482 let api = EngineApi::new(
1483 provider,
1484 chain_spec.clone(),
1485 ConsensusEngineHandle::new(to_engine),
1486 payload_store.into(),
1487 NoopTransactionPool::default(),
1488 Runtime::test(),
1489 ClientVersionV1 {
1490 code: ClientCode::RH,
1491 name: "Reth".to_string(),
1492 version: "v0.0.0-test".to_string(),
1493 commit: "test".to_string(),
1494 },
1495 EngineCapabilities::default(),
1496 EthereumEngineValidator::new(chain_spec),
1497 false,
1498 TestNetworkInfo { syncing: true },
1499 );
1500
1501 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1502 assert_matches!(res, Ok(None));
1503 }
1504
1505 #[tokio::test]
1506 async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
1507 let (mut handle, api) = setup_engine_api();
1508
1509 let state = ForkchoiceState {
1510 head_block_hash: B256::from([0x11; 32]),
1511 safe_block_hash: B256::ZERO,
1512 finalized_block_hash: B256::ZERO,
1513 };
1514 let payload_attributes = PayloadAttributes {
1515 timestamp: 1,
1516 prev_randao: B256::ZERO,
1517 suggested_fee_recipient: Address::ZERO,
1518 withdrawals: Some(vec![]),
1519 parent_beacon_block_root: None,
1521 };
1522
1523 let api_task = tokio::spawn(async move {
1524 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1525 });
1526
1527 let request =
1528 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1529 .await
1530 .expect("timed out waiting for forkchoiceUpdated request")
1531 .expect("expected forkchoiceUpdated request");
1532 let response_tx = match request {
1533 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1534 assert!(
1535 payload_attrs.is_none(),
1536 "FCU for syncing state should be evaluated before payload attributes"
1537 );
1538 tx
1539 }
1540 other => panic!("unexpected engine message: {other:?}"),
1541 };
1542
1543 response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
1544
1545 let response = api_task
1546 .await
1547 .expect("api task should not panic")
1548 .expect("forkchoiceUpdatedV3 should return a syncing response");
1549 assert!(response.payload_status.is_syncing());
1550 assert!(response.payload_id.is_none());
1551 }
1552
1553 #[tokio::test]
1554 async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
1555 let (mut handle, api) = setup_engine_api();
1556
1557 let state = ForkchoiceState {
1558 head_block_hash: B256::from([0x22; 32]),
1559 safe_block_hash: B256::ZERO,
1560 finalized_block_hash: B256::ZERO,
1561 };
1562 let payload_attributes = PayloadAttributes {
1563 timestamp: 1,
1564 prev_randao: B256::ZERO,
1565 suggested_fee_recipient: Address::ZERO,
1566 withdrawals: Some(vec![]),
1567 parent_beacon_block_root: None,
1568 };
1569
1570 let api_task = tokio::spawn(async move {
1571 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1572 });
1573
1574 let request =
1575 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1576 .await
1577 .expect("timed out waiting for forkchoiceUpdated request")
1578 .expect("expected forkchoiceUpdated request");
1579
1580 let response_tx = match request {
1581 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1582 assert!(
1583 payload_attrs.is_none(),
1584 "when attrs are invalid, API should first evaluate forkchoice without attrs"
1585 );
1586 tx
1587 }
1588 other => panic!("unexpected engine message: {other:?}"),
1589 };
1590
1591 response_tx
1592 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1593 PayloadStatusEnum::Valid,
1594 ))))
1595 .expect("send valid response");
1596
1597 let response = api_task.await.expect("api task should not panic");
1598 assert_matches!(
1599 response,
1600 Err(EngineApiError::EngineObjectValidationError(
1601 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
1602 ))
1603 );
1604
1605 match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
1606 .await
1607 {
1608 Err(_) | Ok(None) => {}
1609 Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
1610 panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
1611 }
1612 Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
1613 }
1614 }
1615
1616 mod get_payload_bodies {
1618 use super::*;
1619 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1620 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1621
1622 #[tokio::test]
1623 async fn invalid_params() {
1624 let (_, api) = setup_engine_api();
1625
1626 let by_range_tests = [
1627 (0, 0),
1629 (0, 1),
1630 (1, 0),
1631 ];
1632
1633 for (start, count) in by_range_tests {
1635 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1636 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1637 }
1638 }
1639
1640 #[tokio::test]
1641 async fn request_too_large() {
1642 let (_, api) = setup_engine_api();
1643
1644 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1645 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1646 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1647 }
1648
1649 #[tokio::test]
1650 async fn returns_payload_bodies() {
1651 let mut rng = generators::rng();
1652 let (handle, api) = setup_engine_api();
1653
1654 let (start, count) = (1, 10);
1655 let blocks = random_block_range(
1656 &mut rng,
1657 start..=start + count - 1,
1658 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1659 );
1660 handle
1661 .provider
1662 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1663
1664 let expected = blocks
1665 .iter()
1666 .cloned()
1667 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1668 .collect::<Vec<_>>();
1669
1670 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1671 assert_eq!(res, expected);
1672 }
1673
1674 #[tokio::test]
1675 async fn returns_payload_bodies_with_gaps() {
1676 let mut rng = generators::rng();
1677 let (handle, api) = setup_engine_api();
1678
1679 let (start, count) = (1, 100);
1680 let blocks = random_block_range(
1681 &mut rng,
1682 start..=start + count - 1,
1683 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1684 );
1685
1686 let first_missing_range = 26..=50;
1688 let second_missing_range = 76..=100;
1689 handle.provider.extend_blocks(
1690 blocks
1691 .iter()
1692 .filter(|b| {
1693 !first_missing_range.contains(&b.number) &&
1694 !second_missing_range.contains(&b.number)
1695 })
1696 .map(|b| (b.hash(), b.clone().into_block())),
1697 );
1698
1699 let expected = blocks
1700 .iter()
1701 .filter(|b| !second_missing_range.contains(&b.number))
1704 .cloned()
1705 .map(|b| {
1706 if first_missing_range.contains(&b.number) {
1707 None
1708 } else {
1709 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1710 }
1711 })
1712 .collect::<Vec<_>>();
1713
1714 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1715 assert_eq!(res, expected);
1716
1717 let expected = blocks
1718 .iter()
1719 .cloned()
1720 .map(|b| {
1723 if first_missing_range.contains(&b.number) ||
1724 second_missing_range.contains(&b.number)
1725 {
1726 None
1727 } else {
1728 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1729 }
1730 })
1731 .collect::<Vec<_>>();
1732
1733 let hashes = blocks.iter().map(|b| b.hash()).collect();
1734 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1735 assert_eq!(res, expected);
1736 }
1737 }
1738}