1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::NetworkInfo;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::TaskSpawner;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
81 PayloadT: PayloadTypes,
82 Pool: TransactionPool + 'static,
83 Validator: EngineApiValidator<PayloadT>,
84 ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86 #[expect(clippy::too_many_arguments)]
88 pub fn new(
89 provider: Provider,
90 chain_spec: Arc<ChainSpec>,
91 beacon_consensus: ConsensusEngineHandle<PayloadT>,
92 payload_store: PayloadStore<PayloadT>,
93 tx_pool: Pool,
94 task_spawner: Box<dyn TaskSpawner>,
95 client: ClientVersionV1,
96 capabilities: EngineCapabilities,
97 validator: Validator,
98 accept_execution_requests_hash: bool,
99 network: impl NetworkInfo + 'static,
100 ) -> Self {
101 let is_syncing = Arc::new(move || network.is_syncing());
102 let inner = Arc::new(EngineApiInner {
103 provider,
104 chain_spec,
105 beacon_consensus,
106 payload_store,
107 task_spawner,
108 metrics: EngineApiMetrics::default(),
109 client,
110 capabilities,
111 tx_pool,
112 validator,
113 accept_execution_requests_hash,
114 is_syncing,
115 });
116 Self { inner }
117 }
118
119 pub fn get_client_version_v1(
121 &self,
122 _client: ClientVersionV1,
123 ) -> EngineApiResult<Vec<ClientVersionV1>> {
124 Ok(vec![self.inner.client.clone()])
125 }
126
127 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
129 Ok(self
130 .inner
131 .payload_store
132 .payload_timestamp(payload_id)
133 .await
134 .ok_or(EngineApiError::UnknownPayload)??)
135 }
136
137 pub async fn new_payload_v1(
140 &self,
141 payload: PayloadT::ExecutionData,
142 ) -> EngineApiResult<PayloadStatus> {
143 let payload_or_attrs = PayloadOrAttributes::<
144 '_,
145 PayloadT::ExecutionData,
146 PayloadT::PayloadAttributes,
147 >::from_execution_payload(&payload);
148
149 self.inner
150 .validator
151 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
154 }
155
156 pub async fn new_payload_v1_metered(
158 &self,
159 payload: PayloadT::ExecutionData,
160 ) -> EngineApiResult<PayloadStatus> {
161 let start = Instant::now();
162 let res = Self::new_payload_v1(self, payload).await;
163 let elapsed = start.elapsed();
164 self.inner.metrics.latency.new_payload_v1.record(elapsed);
165 res
166 }
167
168 pub async fn new_payload_v2(
170 &self,
171 payload: PayloadT::ExecutionData,
172 ) -> EngineApiResult<PayloadStatus> {
173 let payload_or_attrs = PayloadOrAttributes::<
174 '_,
175 PayloadT::ExecutionData,
176 PayloadT::PayloadAttributes,
177 >::from_execution_payload(&payload);
178 self.inner
179 .validator
180 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
181 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
182 }
183
184 pub async fn new_payload_v2_metered(
186 &self,
187 payload: PayloadT::ExecutionData,
188 ) -> EngineApiResult<PayloadStatus> {
189 let start = Instant::now();
190 let res = Self::new_payload_v2(self, payload).await;
191 let elapsed = start.elapsed();
192 self.inner.metrics.latency.new_payload_v2.record(elapsed);
193 res
194 }
195
196 pub async fn new_payload_v3(
198 &self,
199 payload: PayloadT::ExecutionData,
200 ) -> EngineApiResult<PayloadStatus> {
201 let payload_or_attrs = PayloadOrAttributes::<
202 '_,
203 PayloadT::ExecutionData,
204 PayloadT::PayloadAttributes,
205 >::from_execution_payload(&payload);
206 self.inner
207 .validator
208 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
209
210 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
211 }
212
213 pub async fn new_payload_v3_metered(
215 &self,
216 payload: PayloadT::ExecutionData,
217 ) -> RpcResult<PayloadStatus> {
218 let start = Instant::now();
219
220 let res = Self::new_payload_v3(self, payload).await;
221 let elapsed = start.elapsed();
222 self.inner.metrics.latency.new_payload_v3.record(elapsed);
223 Ok(res?)
224 }
225
226 pub async fn new_payload_v4(
228 &self,
229 payload: PayloadT::ExecutionData,
230 ) -> EngineApiResult<PayloadStatus> {
231 let payload_or_attrs = PayloadOrAttributes::<
232 '_,
233 PayloadT::ExecutionData,
234 PayloadT::PayloadAttributes,
235 >::from_execution_payload(&payload);
236 self.inner
237 .validator
238 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
239
240 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
241 }
242
243 pub async fn new_payload_v4_metered(
245 &self,
246 payload: PayloadT::ExecutionData,
247 ) -> RpcResult<PayloadStatus> {
248 let start = Instant::now();
249 let res = Self::new_payload_v4(self, payload).await;
250
251 let elapsed = start.elapsed();
252 self.inner.metrics.latency.new_payload_v4.record(elapsed);
253 Ok(res?)
254 }
255
256 pub fn accept_execution_requests_hash(&self) -> bool {
258 self.inner.accept_execution_requests_hash
259 }
260}
261
262impl<Provider, EngineT, Pool, Validator, ChainSpec>
263 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
264where
265 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
266 EngineT: EngineTypes,
267 Pool: TransactionPool + 'static,
268 Validator: EngineApiValidator<EngineT>,
269 ChainSpec: EthereumHardforks + Send + Sync + 'static,
270{
271 pub async fn fork_choice_updated_v1(
278 &self,
279 state: ForkchoiceState,
280 payload_attrs: Option<EngineT::PayloadAttributes>,
281 ) -> EngineApiResult<ForkchoiceUpdated> {
282 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
283 .await
284 }
285
286 pub async fn fork_choice_updated_v1_metered(
288 &self,
289 state: ForkchoiceState,
290 payload_attrs: Option<EngineT::PayloadAttributes>,
291 ) -> EngineApiResult<ForkchoiceUpdated> {
292 let start = Instant::now();
293 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
294 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
295 res
296 }
297
298 pub async fn fork_choice_updated_v2(
303 &self,
304 state: ForkchoiceState,
305 payload_attrs: Option<EngineT::PayloadAttributes>,
306 ) -> EngineApiResult<ForkchoiceUpdated> {
307 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
308 .await
309 }
310
311 pub async fn fork_choice_updated_v2_metered(
313 &self,
314 state: ForkchoiceState,
315 payload_attrs: Option<EngineT::PayloadAttributes>,
316 ) -> EngineApiResult<ForkchoiceUpdated> {
317 let start = Instant::now();
318 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
319 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
320 res
321 }
322
323 pub async fn fork_choice_updated_v3(
328 &self,
329 state: ForkchoiceState,
330 payload_attrs: Option<EngineT::PayloadAttributes>,
331 ) -> EngineApiResult<ForkchoiceUpdated> {
332 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
333 .await
334 }
335
336 pub async fn fork_choice_updated_v3_metered(
338 &self,
339 state: ForkchoiceState,
340 payload_attrs: Option<EngineT::PayloadAttributes>,
341 ) -> EngineApiResult<ForkchoiceUpdated> {
342 let start = Instant::now();
343 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
344 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
345 res
346 }
347
348 async fn get_built_payload(
350 &self,
351 payload_id: PayloadId,
352 ) -> EngineApiResult<EngineT::BuiltPayload> {
353 self.inner
354 .payload_store
355 .resolve(payload_id)
356 .await
357 .ok_or(EngineApiError::UnknownPayload)?
358 .map_err(|_| EngineApiError::UnknownPayload)
359 }
360
361 async fn get_payload_inner<R>(
364 &self,
365 payload_id: PayloadId,
366 version: EngineApiMessageVersion,
367 ) -> EngineApiResult<R>
368 where
369 EngineT::BuiltPayload: TryInto<R>,
370 {
371 let timestamp = self.get_payload_timestamp(payload_id).await?;
374 validate_payload_timestamp(
375 &self.inner.chain_spec,
376 version,
377 timestamp,
378 MessageValidationKind::GetPayload,
379 )?;
380
381 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
383 warn!(?version, "could not transform built payload");
384 EngineApiError::UnknownPayload
385 })
386 }
387
388 pub async fn get_payload_v1(
398 &self,
399 payload_id: PayloadId,
400 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
401 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
402 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
403 EngineApiError::UnknownPayload
404 })
405 }
406
407 pub async fn get_payload_v1_metered(
409 &self,
410 payload_id: PayloadId,
411 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
412 let start = Instant::now();
413 let res = Self::get_payload_v1(self, payload_id).await;
414 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
415 res
416 }
417
418 pub async fn get_payload_v2(
426 &self,
427 payload_id: PayloadId,
428 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
429 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
430 }
431
432 pub async fn get_payload_v2_metered(
434 &self,
435 payload_id: PayloadId,
436 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
437 let start = Instant::now();
438 let res = Self::get_payload_v2(self, payload_id).await;
439 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
440 res
441 }
442
443 pub async fn get_payload_v3(
451 &self,
452 payload_id: PayloadId,
453 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
454 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
455 }
456
457 pub async fn get_payload_v3_metered(
459 &self,
460 payload_id: PayloadId,
461 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
462 let start = Instant::now();
463 let res = Self::get_payload_v3(self, payload_id).await;
464 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
465 res
466 }
467
468 pub async fn get_payload_v4(
476 &self,
477 payload_id: PayloadId,
478 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
479 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
480 }
481
482 pub async fn get_payload_v4_metered(
484 &self,
485 payload_id: PayloadId,
486 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
487 let start = Instant::now();
488 let res = Self::get_payload_v4(self, payload_id).await;
489 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
490 res
491 }
492
493 pub async fn get_payload_v5(
503 &self,
504 payload_id: PayloadId,
505 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
506 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
507 }
508
509 pub async fn get_payload_v5_metered(
511 &self,
512 payload_id: PayloadId,
513 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
514 let start = Instant::now();
515 let res = Self::get_payload_v5(self, payload_id).await;
516 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
517 res
518 }
519
520 pub async fn get_payload_bodies_by_range_with<F, R>(
523 &self,
524 start: BlockNumber,
525 count: u64,
526 f: F,
527 ) -> EngineApiResult<Vec<Option<R>>>
528 where
529 F: Fn(Provider::Block) -> R + Send + 'static,
530 R: Send + 'static,
531 {
532 let (tx, rx) = oneshot::channel();
533 let inner = self.inner.clone();
534
535 self.inner.task_spawner.spawn_blocking_task(Box::pin(async move {
536 if count > MAX_PAYLOAD_BODIES_LIMIT {
537 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
538 return;
539 }
540
541 if start == 0 || count == 0 {
542 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
543 return;
544 }
545
546 let mut result = Vec::with_capacity(count as usize);
547
548 let mut end = start.saturating_add(count - 1);
550
551 if let Ok(best_block) = inner.provider.best_block_number()
554 && end > best_block {
555 end = best_block;
556 }
557
558 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
560 for num in start..=end {
561 if num < earliest_block {
562 result.push(None);
563 continue;
564 }
565 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
566 match block_result {
567 Ok(block) => {
568 result.push(block.map(&f));
569 }
570 Err(err) => {
571 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
572 return;
573 }
574 };
575 }
576 tx.send(Ok(result)).ok();
577 }));
578
579 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
580 }
581
582 pub async fn get_payload_bodies_by_range_v1(
593 &self,
594 start: BlockNumber,
595 count: u64,
596 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
597 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
598 transactions: block.body().encoded_2718_transactions(),
599 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
600 })
601 .await
602 }
603
604 pub async fn get_payload_bodies_by_range_v1_metered(
606 &self,
607 start: BlockNumber,
608 count: u64,
609 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
610 let start_time = Instant::now();
611 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
612 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
613 res
614 }
615
616 pub async fn get_payload_bodies_by_range_v2(
620 &self,
621 start: BlockNumber,
622 count: u64,
623 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
624 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 {
625 transactions: block.body().encoded_2718_transactions(),
626 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
627 block_access_list: None,
628 })
629 .await
630 }
631
632 pub async fn get_payload_bodies_by_range_v2_metered(
634 &self,
635 start: BlockNumber,
636 count: u64,
637 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
638 let start_time = Instant::now();
639 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
640 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
641 res
642 }
643
644 pub async fn get_payload_bodies_by_hash_with<F, R>(
646 &self,
647 hashes: Vec<BlockHash>,
648 f: F,
649 ) -> EngineApiResult<Vec<Option<R>>>
650 where
651 F: Fn(Provider::Block) -> R + Send + 'static,
652 R: Send + 'static,
653 {
654 let len = hashes.len() as u64;
655 if len > MAX_PAYLOAD_BODIES_LIMIT {
656 return Err(EngineApiError::PayloadRequestTooLarge { len });
657 }
658
659 let (tx, rx) = oneshot::channel();
660 let inner = self.inner.clone();
661
662 self.inner.task_spawner.spawn_blocking_task(Box::pin(async move {
663 let mut result = Vec::with_capacity(hashes.len());
664 for hash in hashes {
665 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
666 match block_result {
667 Ok(block) => {
668 result.push(block.map(&f));
669 }
670 Err(err) => {
671 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
672 return;
673 }
674 }
675 }
676 tx.send(Ok(result)).ok();
677 }));
678
679 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
680 }
681
682 pub async fn get_payload_bodies_by_hash_v1(
684 &self,
685 hashes: Vec<BlockHash>,
686 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
687 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
688 transactions: block.body().encoded_2718_transactions(),
689 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
690 })
691 .await
692 }
693
694 pub async fn get_payload_bodies_by_hash_v1_metered(
696 &self,
697 hashes: Vec<BlockHash>,
698 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
699 let start = Instant::now();
700 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
701 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
702 res
703 }
704
705 pub async fn get_payload_bodies_by_hash_v2(
709 &self,
710 hashes: Vec<BlockHash>,
711 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
712 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV2 {
713 transactions: block.body().encoded_2718_transactions(),
714 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
715 block_access_list: None,
716 })
717 .await
718 }
719
720 pub async fn get_payload_bodies_by_hash_v2_metered(
722 &self,
723 hashes: Vec<BlockHash>,
724 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
725 let start = Instant::now();
726 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
727 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
728 res
729 }
730
731 async fn validate_and_execute_forkchoice(
745 &self,
746 version: EngineApiMessageVersion,
747 state: ForkchoiceState,
748 payload_attrs: Option<EngineT::PayloadAttributes>,
749 ) -> EngineApiResult<ForkchoiceUpdated> {
750 if let Some(ref attrs) = payload_attrs {
751 let attr_validation_res =
752 self.inner.validator.ensure_well_formed_attributes(version, attrs);
753
754 if let Err(err) = attr_validation_res {
768 let fcu_res =
769 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
770 if fcu_res.is_invalid() {
773 return Ok(fcu_res)
774 }
775 return Err(err.into())
776 }
777 }
778
779 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
780 }
781
782 pub fn capabilities(&self) -> &EngineCapabilities {
784 &self.inner.capabilities
785 }
786
787 fn get_blobs_v1(
788 &self,
789 versioned_hashes: Vec<B256>,
790 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
791 let current_timestamp =
793 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
794 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
795 return Err(EngineApiError::EngineObjectValidationError(
796 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
797 ));
798 }
799
800 if versioned_hashes.len() > MAX_BLOB_LIMIT {
801 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
802 }
803
804 self.inner
805 .tx_pool
806 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
807 .map_err(|err| EngineApiError::Internal(Box::new(err)))
808 }
809
810 pub fn get_blobs_v1_metered(
812 &self,
813 versioned_hashes: Vec<B256>,
814 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
815 let hashes_len = versioned_hashes.len();
816 let start = Instant::now();
817 let res = Self::get_blobs_v1(self, versioned_hashes);
818 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
819
820 if let Ok(blobs) = &res {
821 let blobs_found = blobs.iter().flatten().count();
822 let blobs_missed = hashes_len - blobs_found;
823
824 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
825 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
826 }
827
828 res
829 }
830
831 fn get_blobs_v2(
832 &self,
833 versioned_hashes: Vec<B256>,
834 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
835 let current_timestamp =
837 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
838 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
839 return Err(EngineApiError::EngineObjectValidationError(
840 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
841 ));
842 }
843
844 if versioned_hashes.len() > MAX_BLOB_LIMIT {
845 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
846 }
847
848 self.inner
849 .tx_pool
850 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
851 .map_err(|err| EngineApiError::Internal(Box::new(err)))
852 }
853
854 fn get_blobs_v3(
855 &self,
856 versioned_hashes: Vec<B256>,
857 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
858 let current_timestamp =
860 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
861 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
862 return Err(EngineApiError::EngineObjectValidationError(
863 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
864 ));
865 }
866
867 if versioned_hashes.len() > MAX_BLOB_LIMIT {
868 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
869 }
870
871 if (*self.inner.is_syncing)() {
873 return Ok(None)
874 }
875
876 self.inner
877 .tx_pool
878 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
879 .map(Some)
880 .map_err(|err| EngineApiError::Internal(Box::new(err)))
881 }
882
883 pub fn get_blobs_v2_metered(
885 &self,
886 versioned_hashes: Vec<B256>,
887 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
888 let hashes_len = versioned_hashes.len();
889 let start = Instant::now();
890 let res = Self::get_blobs_v2(self, versioned_hashes);
891 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
892
893 if let Ok(blobs) = &res {
894 let blobs_found = blobs.iter().flatten().count();
895
896 self.inner
897 .metrics
898 .blob_metrics
899 .get_blobs_requests_blobs_total
900 .increment(hashes_len as u64);
901 self.inner
902 .metrics
903 .blob_metrics
904 .get_blobs_requests_blobs_in_blobpool_total
905 .increment(blobs_found as u64);
906
907 if blobs_found == hashes_len {
908 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
909 } else {
910 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
911 }
912 } else {
913 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
914 }
915
916 res
917 }
918
919 pub fn get_blobs_v3_metered(
921 &self,
922 versioned_hashes: Vec<B256>,
923 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
924 let hashes_len = versioned_hashes.len();
925 let start = Instant::now();
926 let res = Self::get_blobs_v3(self, versioned_hashes);
927 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
928
929 if let Ok(Some(blobs)) = &res {
930 let blobs_found = blobs.iter().flatten().count();
931 let blobs_missed = hashes_len - blobs_found;
932
933 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
934 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
935 }
936
937 res
938 }
939}
940
941#[async_trait]
943impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
944 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
945where
946 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
947 EngineT: EngineTypes<ExecutionData = ExecutionData>,
948 Pool: TransactionPool + 'static,
949 Validator: EngineApiValidator<EngineT>,
950 ChainSpec: EthereumHardforks + Send + Sync + 'static,
951{
952 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
956 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
957 let payload =
958 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
959 Ok(self.new_payload_v1_metered(payload).await?)
960 }
961
962 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
965 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
966 let payload = ExecutionData {
967 payload: payload.into_payload(),
968 sidecar: ExecutionPayloadSidecar::none(),
969 };
970
971 Ok(self.new_payload_v2_metered(payload).await?)
972 }
973
974 async fn new_payload_v3(
977 &self,
978 payload: ExecutionPayloadV3,
979 versioned_hashes: Vec<B256>,
980 parent_beacon_block_root: B256,
981 ) -> RpcResult<PayloadStatus> {
982 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
983 let payload = ExecutionData {
984 payload: payload.into(),
985 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
986 versioned_hashes,
987 parent_beacon_block_root,
988 }),
989 };
990
991 Ok(self.new_payload_v3_metered(payload).await?)
992 }
993
994 async fn new_payload_v4(
997 &self,
998 payload: ExecutionPayloadV3,
999 versioned_hashes: Vec<B256>,
1000 parent_beacon_block_root: B256,
1001 requests: RequestsOrHash,
1002 ) -> RpcResult<PayloadStatus> {
1003 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1004
1005 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1007 return Err(EngineApiError::UnexpectedRequestsHash.into());
1008 }
1009
1010 let payload = ExecutionData {
1011 payload: payload.into(),
1012 sidecar: ExecutionPayloadSidecar::v4(
1013 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1014 PraguePayloadFields { requests },
1015 ),
1016 };
1017
1018 Ok(self.new_payload_v4_metered(payload).await?)
1019 }
1020
1021 async fn new_payload_v5(
1027 &self,
1028 _payload: ExecutionPayloadV4,
1029 _versioned_hashes: Vec<B256>,
1030 _parent_beacon_block_root: B256,
1031 _execution_requests: RequestsOrHash,
1032 ) -> RpcResult<PayloadStatus> {
1033 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1034 Err(EngineApiError::EngineObjectValidationError(
1035 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1036 ))?
1037 }
1038
1039 async fn fork_choice_updated_v1(
1044 &self,
1045 fork_choice_state: ForkchoiceState,
1046 payload_attributes: Option<EngineT::PayloadAttributes>,
1047 ) -> RpcResult<ForkchoiceUpdated> {
1048 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1049 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1050 }
1051
1052 async fn fork_choice_updated_v2(
1055 &self,
1056 fork_choice_state: ForkchoiceState,
1057 payload_attributes: Option<EngineT::PayloadAttributes>,
1058 ) -> RpcResult<ForkchoiceUpdated> {
1059 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1060 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1061 }
1062
1063 async fn fork_choice_updated_v3(
1067 &self,
1068 fork_choice_state: ForkchoiceState,
1069 payload_attributes: Option<EngineT::PayloadAttributes>,
1070 ) -> RpcResult<ForkchoiceUpdated> {
1071 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1072 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1073 }
1074
1075 async fn get_payload_v1(
1087 &self,
1088 payload_id: PayloadId,
1089 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1090 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1091 Ok(self.get_payload_v1_metered(payload_id).await?)
1092 }
1093
1094 async fn get_payload_v2(
1104 &self,
1105 payload_id: PayloadId,
1106 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1107 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1108 Ok(self.get_payload_v2_metered(payload_id).await?)
1109 }
1110
1111 async fn get_payload_v3(
1121 &self,
1122 payload_id: PayloadId,
1123 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1124 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1125 Ok(self.get_payload_v3_metered(payload_id).await?)
1126 }
1127
1128 async fn get_payload_v4(
1138 &self,
1139 payload_id: PayloadId,
1140 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1141 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1142 Ok(self.get_payload_v4_metered(payload_id).await?)
1143 }
1144
1145 async fn get_payload_v5(
1155 &self,
1156 payload_id: PayloadId,
1157 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1158 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1159 Ok(self.get_payload_v5_metered(payload_id).await?)
1160 }
1161
1162 async fn get_payload_v6(
1168 &self,
1169 _payload_id: PayloadId,
1170 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1171 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1172 Err(EngineApiError::EngineObjectValidationError(
1173 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1174 ))?
1175 }
1176
1177 async fn get_payload_bodies_by_hash_v1(
1180 &self,
1181 block_hashes: Vec<BlockHash>,
1182 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1183 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1184 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1185 }
1186
1187 async fn get_payload_bodies_by_hash_v2(
1193 &self,
1194 block_hashes: Vec<BlockHash>,
1195 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1196 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1197 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1198 }
1199
1200 async fn get_payload_bodies_by_range_v1(
1217 &self,
1218 start: U64,
1219 count: U64,
1220 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1221 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1222 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1223 }
1224
1225 async fn get_payload_bodies_by_range_v2(
1231 &self,
1232 start: U64,
1233 count: U64,
1234 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1235 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1236 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1237 }
1238
1239 async fn get_client_version_v1(
1243 &self,
1244 client: ClientVersionV1,
1245 ) -> RpcResult<Vec<ClientVersionV1>> {
1246 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1247 Ok(Self::get_client_version_v1(self, client)?)
1248 }
1249
1250 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1253 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1254
1255 let el_caps = self.capabilities();
1256 el_caps.log_capability_mismatches(&capabilities);
1257
1258 Ok(el_caps.list())
1259 }
1260
1261 async fn get_blobs_v1(
1262 &self,
1263 versioned_hashes: Vec<B256>,
1264 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1265 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1266 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1267 }
1268
1269 async fn get_blobs_v2(
1270 &self,
1271 versioned_hashes: Vec<B256>,
1272 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1273 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1274 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1275 }
1276
1277 async fn get_blobs_v3(
1278 &self,
1279 versioned_hashes: Vec<B256>,
1280 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1281 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1282 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1283 }
1284}
1285
1286impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1287 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1288where
1289 EngineT: EngineTypes,
1290 Self: EngineApiServer<EngineT>,
1291{
1292 fn into_rpc_module(self) -> RpcModule<()> {
1293 self.into_rpc().remove_context()
1294 }
1295}
1296
1297impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1298 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1299where
1300 PayloadT: PayloadTypes,
1301{
1302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1303 f.debug_struct("EngineApi").finish_non_exhaustive()
1304 }
1305}
1306
1307impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1308 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1309where
1310 PayloadT: PayloadTypes,
1311{
1312 fn clone(&self) -> Self {
1313 Self { inner: Arc::clone(&self.inner) }
1314 }
1315}
1316
1317struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1319 provider: Provider,
1321 chain_spec: Arc<ChainSpec>,
1323 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1325 payload_store: PayloadStore<PayloadT>,
1327 task_spawner: Box<dyn TaskSpawner>,
1329 metrics: EngineApiMetrics,
1331 client: ClientVersionV1,
1333 capabilities: EngineCapabilities,
1335 tx_pool: Pool,
1337 validator: Validator,
1339 accept_execution_requests_hash: bool,
1340 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346 use super::*;
1347 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1348 use assert_matches::assert_matches;
1349 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1350 use reth_engine_primitives::BeaconEngineMessage;
1351 use reth_ethereum_engine_primitives::EthEngineTypes;
1352 use reth_ethereum_primitives::Block;
1353 use reth_network_api::{
1354 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1355 };
1356 use reth_node_ethereum::EthereumEngineValidator;
1357 use reth_payload_builder::test_utils::spawn_test_payload_service;
1358 use reth_provider::test_utils::MockEthProvider;
1359 use reth_tasks::TokioTaskExecutor;
1360 use reth_transaction_pool::noop::NoopTransactionPool;
1361 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1362
1363 fn setup_engine_api() -> (
1364 EngineApiTestHandle,
1365 EngineApi<
1366 Arc<MockEthProvider>,
1367 EthEngineTypes,
1368 NoopTransactionPool,
1369 EthereumEngineValidator,
1370 ChainSpec,
1371 >,
1372 ) {
1373 let client = ClientVersionV1 {
1374 code: ClientCode::RH,
1375 name: "Reth".to_string(),
1376 version: "v0.2.0-beta.5".to_string(),
1377 commit: "defa64b2".to_string(),
1378 };
1379
1380 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1381 let provider = Arc::new(MockEthProvider::default());
1382 let payload_store = spawn_test_payload_service();
1383 let (to_engine, engine_rx) = unbounded_channel();
1384 let task_executor = Box::<TokioTaskExecutor>::default();
1385 let api = EngineApi::new(
1386 provider.clone(),
1387 chain_spec.clone(),
1388 ConsensusEngineHandle::new(to_engine),
1389 payload_store.into(),
1390 NoopTransactionPool::default(),
1391 task_executor,
1392 client,
1393 EngineCapabilities::default(),
1394 EthereumEngineValidator::new(chain_spec.clone()),
1395 false,
1396 NoopNetwork::default(),
1397 );
1398 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1399 (handle, api)
1400 }
1401
1402 #[tokio::test]
1403 async fn engine_client_version_v1() {
1404 let client = ClientVersionV1 {
1405 code: ClientCode::RH,
1406 name: "Reth".to_string(),
1407 version: "v0.2.0-beta.5".to_string(),
1408 commit: "defa64b2".to_string(),
1409 };
1410 let (_, api) = setup_engine_api();
1411 let res = api.get_client_version_v1(client.clone());
1412 assert_eq!(res.unwrap(), vec![client]);
1413 }
1414
1415 struct EngineApiTestHandle {
1416 #[allow(dead_code)]
1417 chain_spec: Arc<ChainSpec>,
1418 provider: Arc<MockEthProvider>,
1419 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1420 }
1421
1422 #[tokio::test]
1423 async fn forwards_responses_to_consensus_engine() {
1424 let (mut handle, api) = setup_engine_api();
1425
1426 tokio::spawn(async move {
1427 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1428 let execution_data = ExecutionData {
1429 payload: payload_v1.into(),
1430 sidecar: ExecutionPayloadSidecar::none(),
1431 };
1432
1433 api.new_payload_v1(execution_data).await.unwrap();
1434 });
1435 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1436 }
1437
1438 #[derive(Clone)]
1439 struct TestNetworkInfo {
1440 syncing: bool,
1441 }
1442
1443 impl NetworkInfo for TestNetworkInfo {
1444 fn local_addr(&self) -> std::net::SocketAddr {
1445 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1446 }
1447
1448 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1449 #[allow(deprecated)]
1450 Ok(NetworkStatus {
1451 client_version: "test".to_string(),
1452 protocol_version: 5,
1453 eth_protocol_info: EthProtocolInfo {
1454 network: 1,
1455 difficulty: None,
1456 genesis: Default::default(),
1457 config: Default::default(),
1458 head: Default::default(),
1459 },
1460 capabilities: vec![],
1461 })
1462 }
1463
1464 fn chain_id(&self) -> u64 {
1465 1
1466 }
1467
1468 fn is_syncing(&self) -> bool {
1469 self.syncing
1470 }
1471
1472 fn is_initially_syncing(&self) -> bool {
1473 self.syncing
1474 }
1475 }
1476
1477 #[tokio::test]
1478 async fn get_blobs_v3_returns_null_when_syncing() {
1479 let chain_spec: Arc<ChainSpec> =
1480 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1481 let provider = Arc::new(MockEthProvider::default());
1482 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1483 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1484
1485 let api = EngineApi::new(
1486 provider,
1487 chain_spec.clone(),
1488 ConsensusEngineHandle::new(to_engine),
1489 payload_store.into(),
1490 NoopTransactionPool::default(),
1491 Box::<TokioTaskExecutor>::default(),
1492 ClientVersionV1 {
1493 code: ClientCode::RH,
1494 name: "Reth".to_string(),
1495 version: "v0.0.0-test".to_string(),
1496 commit: "test".to_string(),
1497 },
1498 EngineCapabilities::default(),
1499 EthereumEngineValidator::new(chain_spec),
1500 false,
1501 TestNetworkInfo { syncing: true },
1502 );
1503
1504 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1505 assert_matches!(res, Ok(None));
1506 }
1507
1508 mod get_payload_bodies {
1510 use super::*;
1511 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1512 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1513
1514 #[tokio::test]
1515 async fn invalid_params() {
1516 let (_, api) = setup_engine_api();
1517
1518 let by_range_tests = [
1519 (0, 0),
1521 (0, 1),
1522 (1, 0),
1523 ];
1524
1525 for (start, count) in by_range_tests {
1527 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1528 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1529 }
1530 }
1531
1532 #[tokio::test]
1533 async fn request_too_large() {
1534 let (_, api) = setup_engine_api();
1535
1536 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1537 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1538 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1539 }
1540
1541 #[tokio::test]
1542 async fn returns_payload_bodies() {
1543 let mut rng = generators::rng();
1544 let (handle, api) = setup_engine_api();
1545
1546 let (start, count) = (1, 10);
1547 let blocks = random_block_range(
1548 &mut rng,
1549 start..=start + count - 1,
1550 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1551 );
1552 handle
1553 .provider
1554 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1555
1556 let expected = blocks
1557 .iter()
1558 .cloned()
1559 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1560 .collect::<Vec<_>>();
1561
1562 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1563 assert_eq!(res, expected);
1564 }
1565
1566 #[tokio::test]
1567 async fn returns_payload_bodies_with_gaps() {
1568 let mut rng = generators::rng();
1569 let (handle, api) = setup_engine_api();
1570
1571 let (start, count) = (1, 100);
1572 let blocks = random_block_range(
1573 &mut rng,
1574 start..=start + count - 1,
1575 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1576 );
1577
1578 let first_missing_range = 26..=50;
1580 let second_missing_range = 76..=100;
1581 handle.provider.extend_blocks(
1582 blocks
1583 .iter()
1584 .filter(|b| {
1585 !first_missing_range.contains(&b.number) &&
1586 !second_missing_range.contains(&b.number)
1587 })
1588 .map(|b| (b.hash(), b.clone().into_block())),
1589 );
1590
1591 let expected = blocks
1592 .iter()
1593 .filter(|b| !second_missing_range.contains(&b.number))
1596 .cloned()
1597 .map(|b| {
1598 if first_missing_range.contains(&b.number) {
1599 None
1600 } else {
1601 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1602 }
1603 })
1604 .collect::<Vec<_>>();
1605
1606 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1607 assert_eq!(res, expected);
1608
1609 let expected = blocks
1610 .iter()
1611 .cloned()
1612 .map(|b| {
1615 if first_missing_range.contains(&b.number) ||
1616 second_missing_range.contains(&b.number)
1617 {
1618 None
1619 } else {
1620 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1621 }
1622 })
1623 .collect::<Vec<_>>();
1624
1625 let hashes = blocks.iter().map(|b| b.hash()).collect();
1626 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1627 assert_eq!(res, expected);
1628 }
1629 }
1630}