1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B128, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::{CellCustody, NetworkInfo};
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BalProvider, BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75
76 pub fn client_version(&self) -> &ClientVersionV1 {
78 &self.inner.client
79 }
80}
81
82impl<Provider, PayloadT, Pool, Validator, ChainSpec>
83 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
84where
85 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
86 PayloadT: PayloadTypes,
87 Pool: TransactionPool + 'static,
88 Validator: EngineApiValidator<PayloadT>,
89 ChainSpec: EthereumHardforks + Send + Sync + 'static,
90{
91 #[expect(clippy::too_many_arguments)]
93 pub fn new(
94 provider: Provider,
95 chain_spec: Arc<ChainSpec>,
96 beacon_consensus: ConsensusEngineHandle<PayloadT>,
97 payload_store: PayloadStore<PayloadT>,
98 tx_pool: Pool,
99 task_spawner: Runtime,
100 client: ClientVersionV1,
101 capabilities: EngineCapabilities,
102 validator: Validator,
103 accept_execution_requests_hash: bool,
104 network: impl NetworkInfo + 'static,
105 ) -> Self {
106 let cell_custody = network.cell_custody().clone();
107 let is_syncing = Arc::new(move || network.is_syncing());
108 let inner = Arc::new(EngineApiInner {
109 provider,
110 chain_spec,
111 beacon_consensus,
112 payload_store,
113 task_spawner,
114 metrics: EngineApiMetrics::default(),
115 client,
116 capabilities,
117 tx_pool,
118 validator,
119 accept_execution_requests_hash,
120 cell_custody,
121 is_syncing,
122 });
123 Self { inner }
124 }
125
126 pub fn get_client_version_v1(
128 &self,
129 _client: ClientVersionV1,
130 ) -> EngineApiResult<Vec<ClientVersionV1>> {
131 Ok(vec![self.inner.client.clone()])
132 }
133
134 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
136 Ok(self
137 .inner
138 .payload_store
139 .payload_timestamp(payload_id)
140 .await
141 .ok_or(EngineApiError::UnknownPayload)??)
142 }
143
144 pub async fn new_payload_v1(
147 &self,
148 payload: PayloadT::ExecutionData,
149 ) -> EngineApiResult<PayloadStatus> {
150 let payload_or_attrs = PayloadOrAttributes::<
151 '_,
152 PayloadT::ExecutionData,
153 PayloadT::PayloadAttributes,
154 >::from_execution_payload(&payload);
155
156 self.inner
157 .validator
158 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
159
160 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
161 }
162
163 pub async fn new_payload_v1_metered(
165 &self,
166 payload: PayloadT::ExecutionData,
167 ) -> EngineApiResult<PayloadStatus> {
168 let start = Instant::now();
169 let res = Self::new_payload_v1(self, payload).await;
170 let elapsed = start.elapsed();
171 self.inner.metrics.latency.new_payload_v1.record(elapsed);
172 res
173 }
174
175 pub async fn new_payload_v2(
177 &self,
178 payload: PayloadT::ExecutionData,
179 ) -> EngineApiResult<PayloadStatus> {
180 let payload_or_attrs = PayloadOrAttributes::<
181 '_,
182 PayloadT::ExecutionData,
183 PayloadT::PayloadAttributes,
184 >::from_execution_payload(&payload);
185 self.inner
186 .validator
187 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
188 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
189 }
190
191 pub async fn new_payload_v2_metered(
193 &self,
194 payload: PayloadT::ExecutionData,
195 ) -> EngineApiResult<PayloadStatus> {
196 let start = Instant::now();
197 let res = Self::new_payload_v2(self, payload).await;
198 let elapsed = start.elapsed();
199 self.inner.metrics.latency.new_payload_v2.record(elapsed);
200 res
201 }
202
203 pub async fn new_payload_v3(
205 &self,
206 payload: PayloadT::ExecutionData,
207 ) -> EngineApiResult<PayloadStatus> {
208 let payload_or_attrs = PayloadOrAttributes::<
209 '_,
210 PayloadT::ExecutionData,
211 PayloadT::PayloadAttributes,
212 >::from_execution_payload(&payload);
213 self.inner
214 .validator
215 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
216
217 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
218 }
219
220 pub async fn new_payload_v3_metered(
222 &self,
223 payload: PayloadT::ExecutionData,
224 ) -> RpcResult<PayloadStatus> {
225 let start = Instant::now();
226
227 let res = Self::new_payload_v3(self, payload).await;
228 let elapsed = start.elapsed();
229 self.inner.metrics.latency.new_payload_v3.record(elapsed);
230 Ok(res?)
231 }
232
233 pub async fn new_payload_v4(
235 &self,
236 payload: PayloadT::ExecutionData,
237 ) -> EngineApiResult<PayloadStatus> {
238 let payload_or_attrs = PayloadOrAttributes::<
239 '_,
240 PayloadT::ExecutionData,
241 PayloadT::PayloadAttributes,
242 >::from_execution_payload(&payload);
243 self.inner
244 .validator
245 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
246
247 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
248 }
249
250 pub async fn new_payload_v4_metered(
252 &self,
253 payload: PayloadT::ExecutionData,
254 ) -> RpcResult<PayloadStatus> {
255 let start = Instant::now();
256 let res = Self::new_payload_v4(self, payload).await;
257
258 let elapsed = start.elapsed();
259 self.inner.metrics.latency.new_payload_v4.record(elapsed);
260 Ok(res?)
261 }
262
263 pub async fn new_payload_v5(
269 &self,
270 payload: PayloadT::ExecutionData,
271 ) -> EngineApiResult<PayloadStatus> {
272 let payload_or_attrs = PayloadOrAttributes::<
273 '_,
274 PayloadT::ExecutionData,
275 PayloadT::PayloadAttributes,
276 >::from_execution_payload(&payload);
277 self.inner
278 .validator
279 .validate_version_specific_fields(EngineApiMessageVersion::V5, payload_or_attrs)?;
280 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
281 }
282
283 pub async fn new_payload_v5_metered(
285 &self,
286 payload: PayloadT::ExecutionData,
287 ) -> RpcResult<PayloadStatus> {
288 let start = Instant::now();
289 let res = Self::new_payload_v5(self, payload).await;
290 let elapsed = start.elapsed();
291 self.inner.metrics.latency.new_payload_v5.record(elapsed);
292 Ok(res?)
293 }
294
295 pub fn accept_execution_requests_hash(&self) -> bool {
297 self.inner.accept_execution_requests_hash
298 }
299}
300
301impl<Provider, EngineT, Pool, Validator, ChainSpec>
302 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
303where
304 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
305 EngineT: EngineTypes,
306 Pool: TransactionPool + 'static,
307 Validator: EngineApiValidator<EngineT>,
308 ChainSpec: EthereumHardforks + Send + Sync + 'static,
309{
310 pub async fn fork_choice_updated_v1(
317 &self,
318 state: ForkchoiceState,
319 payload_attrs: Option<EngineT::PayloadAttributes>,
320 ) -> EngineApiResult<ForkchoiceUpdated> {
321 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
322 .await
323 }
324
325 pub async fn fork_choice_updated_v1_metered(
327 &self,
328 state: ForkchoiceState,
329 payload_attrs: Option<EngineT::PayloadAttributes>,
330 ) -> EngineApiResult<ForkchoiceUpdated> {
331 let start = Instant::now();
332 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
333 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
334 res
335 }
336
337 pub async fn fork_choice_updated_v2(
342 &self,
343 state: ForkchoiceState,
344 payload_attrs: Option<EngineT::PayloadAttributes>,
345 ) -> EngineApiResult<ForkchoiceUpdated> {
346 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
347 .await
348 }
349
350 pub async fn fork_choice_updated_v2_metered(
352 &self,
353 state: ForkchoiceState,
354 payload_attrs: Option<EngineT::PayloadAttributes>,
355 ) -> EngineApiResult<ForkchoiceUpdated> {
356 let start = Instant::now();
357 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
358 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
359 res
360 }
361
362 pub async fn fork_choice_updated_v3(
367 &self,
368 state: ForkchoiceState,
369 payload_attrs: Option<EngineT::PayloadAttributes>,
370 ) -> EngineApiResult<ForkchoiceUpdated> {
371 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
372 .await
373 }
374
375 pub async fn fork_choice_updated_v3_metered(
377 &self,
378 state: ForkchoiceState,
379 payload_attrs: Option<EngineT::PayloadAttributes>,
380 ) -> EngineApiResult<ForkchoiceUpdated> {
381 let start = Instant::now();
382 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
383 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
384 res
385 }
386
387 pub async fn fork_choice_updated_v4(
392 &self,
393 state: ForkchoiceState,
394 payload_attrs: Option<EngineT::PayloadAttributes>,
395 custody_columns: Option<B128>,
396 ) -> EngineApiResult<ForkchoiceUpdated> {
397 if let Some(custody_columns) = custody_columns {
398 self.inner.cell_custody.set(custody_columns);
399 }
400 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V4, state, payload_attrs)
401 .await
402 }
403
404 pub async fn fork_choice_updated_v4_metered(
406 &self,
407 state: ForkchoiceState,
408 payload_attrs: Option<EngineT::PayloadAttributes>,
409 custody_columns: Option<B128>,
410 ) -> EngineApiResult<ForkchoiceUpdated> {
411 let start = Instant::now();
412 let res = Self::fork_choice_updated_v4(self, state, payload_attrs, custody_columns).await;
413 self.inner.metrics.latency.fork_choice_updated_v4.record(start.elapsed());
414 res
415 }
416
417 async fn get_built_payload(
419 &self,
420 payload_id: PayloadId,
421 ) -> EngineApiResult<EngineT::BuiltPayload> {
422 self.inner
423 .payload_store
424 .resolve(payload_id)
425 .await
426 .ok_or(EngineApiError::UnknownPayload)?
427 .map_err(|_| EngineApiError::UnknownPayload)
428 }
429
430 async fn get_payload_inner<R>(
433 &self,
434 payload_id: PayloadId,
435 version: EngineApiMessageVersion,
436 ) -> EngineApiResult<R>
437 where
438 EngineT::BuiltPayload: TryInto<R>,
439 {
440 let timestamp = self.get_payload_timestamp(payload_id).await?;
443 validate_payload_timestamp(
444 &self.inner.chain_spec,
445 version,
446 timestamp,
447 MessageValidationKind::GetPayload,
448 )?;
449
450 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
452 warn!(?version, "could not transform built payload");
453 EngineApiError::UnknownPayload
454 })
455 }
456
457 pub async fn get_payload_v1(
467 &self,
468 payload_id: PayloadId,
469 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
470 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
471 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
472 EngineApiError::UnknownPayload
473 })
474 }
475
476 pub async fn get_payload_v1_metered(
478 &self,
479 payload_id: PayloadId,
480 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
481 let start = Instant::now();
482 let res = Self::get_payload_v1(self, payload_id).await;
483 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
484 res
485 }
486
487 pub async fn get_payload_v2(
495 &self,
496 payload_id: PayloadId,
497 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
498 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
499 }
500
501 pub async fn get_payload_v2_metered(
503 &self,
504 payload_id: PayloadId,
505 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
506 let start = Instant::now();
507 let res = Self::get_payload_v2(self, payload_id).await;
508 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
509 res
510 }
511
512 pub async fn get_payload_v3(
520 &self,
521 payload_id: PayloadId,
522 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
523 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
524 }
525
526 pub async fn get_payload_v3_metered(
528 &self,
529 payload_id: PayloadId,
530 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
531 let start = Instant::now();
532 let res = Self::get_payload_v3(self, payload_id).await;
533 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
534 res
535 }
536
537 pub async fn get_payload_v4(
545 &self,
546 payload_id: PayloadId,
547 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
548 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
549 }
550
551 pub async fn get_payload_v4_metered(
553 &self,
554 payload_id: PayloadId,
555 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
556 let start = Instant::now();
557 let res = Self::get_payload_v4(self, payload_id).await;
558 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
559 res
560 }
561
562 pub async fn get_payload_v5(
572 &self,
573 payload_id: PayloadId,
574 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
575 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
576 }
577
578 pub async fn get_payload_v5_metered(
580 &self,
581 payload_id: PayloadId,
582 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
583 let start = Instant::now();
584 let res = Self::get_payload_v5(self, payload_id).await;
585 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
586 res
587 }
588
589 pub async fn get_payload_v6(
595 &self,
596 payload_id: PayloadId,
597 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
598 self.get_payload_inner(payload_id, EngineApiMessageVersion::V6).await
599 }
600
601 pub async fn get_payload_v6_metered(
603 &self,
604 payload_id: PayloadId,
605 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
606 let start = Instant::now();
607 let res = Self::get_payload_v6(self, payload_id).await;
608 self.inner.metrics.latency.get_payload_v6.record(start.elapsed());
609 res
610 }
611
612 pub async fn get_payload_bodies_by_range_with<F, R>(
615 &self,
616 start: BlockNumber,
617 count: u64,
618 f: F,
619 ) -> EngineApiResult<Vec<Option<R>>>
620 where
621 F: Fn(Provider::Block) -> R + Send + 'static,
622 R: Send + 'static,
623 {
624 let (tx, rx) = oneshot::channel();
625 let inner = self.inner.clone();
626
627 self.inner.task_spawner.spawn_blocking_task(async move {
628 if count > MAX_PAYLOAD_BODIES_LIMIT {
629 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
630 return;
631 }
632
633 if start == 0 || count == 0 {
634 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
635 return;
636 }
637
638 let mut result = Vec::with_capacity(count as usize);
639
640 let mut end = start.saturating_add(count - 1);
642
643 if let Ok(best_block) = inner.provider.best_block_number()
646 && end > best_block {
647 end = best_block;
648 }
649
650 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
652 for num in start..=end {
653 if num < earliest_block {
654 result.push(None);
655 continue;
656 }
657 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
658 match block_result {
659 Ok(block) => {
660 result.push(block.map(&f));
661 }
662 Err(err) => {
663 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
664 return;
665 }
666 };
667 }
668 tx.send(Ok(result)).ok();
669 });
670
671 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
672 }
673
674 pub async fn get_payload_bodies_by_range_v1(
685 &self,
686 start: BlockNumber,
687 count: u64,
688 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
689 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
690 transactions: block.body().encoded_2718_transactions(),
691 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
692 })
693 .await
694 }
695
696 pub async fn get_payload_bodies_by_range_v1_metered(
698 &self,
699 start: BlockNumber,
700 count: u64,
701 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
702 let start_time = Instant::now();
703 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
704 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
705 res
706 }
707
708 pub async fn get_payload_bodies_by_range_v2(
712 &self,
713 start: BlockNumber,
714 count: u64,
715 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
716 let mut payload_bodies = self
717 .get_payload_bodies_by_range_with(start, count, |block| {
718 let block_hash = block.header().hash_slow();
719 (
720 block_hash,
721 ExecutionPayloadBodyV2 {
722 transactions: block.body().encoded_2718_transactions(),
723 withdrawals: block
724 .body()
725 .withdrawals()
726 .cloned()
727 .map(Withdrawals::into_inner),
728 block_access_list: None,
729 },
730 )
731 })
732 .await?;
733
734 let block_hashes = payload_bodies
735 .iter()
736 .filter_map(|payload_body| payload_body.as_ref().map(|(block_hash, _)| *block_hash))
737 .collect::<Vec<_>>();
738 let block_access_lists = self.get_block_access_lists_by_hashes(block_hashes).await?;
739
740 for (payload_body, block_access_list) in
741 payload_bodies.iter_mut().filter_map(Option::as_mut).zip(block_access_lists)
742 {
743 payload_body.1.block_access_list = block_access_list;
744 }
745
746 Ok(payload_bodies
747 .into_iter()
748 .map(|payload_body| payload_body.map(|(_, payload_body)| payload_body))
749 .collect())
750 }
751
752 pub async fn get_payload_bodies_by_range_v2_metered(
754 &self,
755 start: BlockNumber,
756 count: u64,
757 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
758 let start_time = Instant::now();
759 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
760 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
761 res
762 }
763
764 pub async fn get_payload_bodies_by_hash_with<F, R>(
766 &self,
767 hashes: Vec<BlockHash>,
768 f: F,
769 ) -> EngineApiResult<Vec<Option<R>>>
770 where
771 F: Fn(Provider::Block) -> R + Send + 'static,
772 R: Send + 'static,
773 {
774 let len = hashes.len() as u64;
775 if len > MAX_PAYLOAD_BODIES_LIMIT {
776 return Err(EngineApiError::PayloadRequestTooLarge { len });
777 }
778
779 let (tx, rx) = oneshot::channel();
780 let inner = self.inner.clone();
781
782 self.inner.task_spawner.spawn_blocking_task(async move {
783 let mut result = Vec::with_capacity(hashes.len());
784 for hash in hashes {
785 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
786 match block_result {
787 Ok(block) => {
788 result.push(block.map(&f));
789 }
790 Err(err) => {
791 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
792 return;
793 }
794 }
795 }
796 tx.send(Ok(result)).ok();
797 });
798
799 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
800 }
801
802 async fn get_block_access_lists_by_hashes(
803 &self,
804 hashes: Vec<BlockHash>,
805 ) -> EngineApiResult<Vec<Option<Bytes>>> {
806 let len = hashes.len() as u64;
807 if len > MAX_PAYLOAD_BODIES_LIMIT {
808 return Err(EngineApiError::PayloadRequestTooLarge { len });
809 }
810
811 let (tx, rx) = oneshot::channel();
812 let bal_store = self.inner.provider.bal_store().clone();
813
814 self.inner.task_spawner.spawn_blocking_task(async move {
815 tx.send(
816 bal_store
817 .get_by_hashes(&hashes)
818 .map_err(|err| EngineApiError::Internal(Box::new(err))),
819 )
820 .ok();
821 });
822
823 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
824 }
825
826 pub async fn get_payload_bodies_by_hash_v1(
828 &self,
829 hashes: Vec<BlockHash>,
830 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
831 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
832 transactions: block.body().encoded_2718_transactions(),
833 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
834 })
835 .await
836 }
837
838 pub async fn get_payload_bodies_by_hash_v1_metered(
840 &self,
841 hashes: Vec<BlockHash>,
842 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
843 let start = Instant::now();
844 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
845 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
846 res
847 }
848
849 pub async fn get_payload_bodies_by_hash_v2(
853 &self,
854 hashes: Vec<BlockHash>,
855 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
856 let payload_bodies =
857 self.get_payload_bodies_by_hash_with(hashes.clone(), |block| ExecutionPayloadBodyV2 {
858 transactions: block.body().encoded_2718_transactions(),
859 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
860 block_access_list: None,
861 });
862 let block_access_lists = self.get_block_access_lists_by_hashes(hashes);
863 let (mut payload_bodies, block_access_lists) =
864 tokio::try_join!(payload_bodies, block_access_lists)?;
865
866 for (payload_body, block_access_list) in payload_bodies.iter_mut().zip(block_access_lists) {
867 if let Some(payload_body) = payload_body {
868 payload_body.block_access_list = block_access_list;
869 }
870 }
871
872 Ok(payload_bodies)
873 }
874
875 pub async fn get_payload_bodies_by_hash_v2_metered(
877 &self,
878 hashes: Vec<BlockHash>,
879 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
880 let start = Instant::now();
881 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
882 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
883 res
884 }
885
886 async fn validate_and_execute_forkchoice(
900 &self,
901 version: EngineApiMessageVersion,
902 state: ForkchoiceState,
903 payload_attrs: Option<EngineT::PayloadAttributes>,
904 ) -> EngineApiResult<ForkchoiceUpdated> {
905 if let Some(ref attrs) = payload_attrs {
906 let attr_validation_res =
907 self.inner.validator.ensure_well_formed_attributes(version, attrs);
908
909 if let Err(err) = attr_validation_res {
919 let fcu_res = self.inner.beacon_consensus.fork_choice_updated(state, None).await?;
920 if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
921 return Ok(fcu_res)
922 }
923 return Err(err.into())
924 }
925 }
926
927 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
928 }
929
930 pub fn capabilities(&self) -> &EngineCapabilities {
932 &self.inner.capabilities
933 }
934
935 fn get_blobs_v1(
936 &self,
937 versioned_hashes: Vec<B256>,
938 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
939 let current_timestamp =
941 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
942 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
943 return Err(EngineApiError::EngineObjectValidationError(
944 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
945 ));
946 }
947
948 if versioned_hashes.len() > MAX_BLOB_LIMIT {
949 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
950 }
951
952 self.inner
953 .tx_pool
954 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
955 .map_err(|err| EngineApiError::Internal(Box::new(err)))
956 }
957
958 pub fn get_blobs_v1_metered(
960 &self,
961 versioned_hashes: Vec<B256>,
962 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
963 let hashes_len = versioned_hashes.len();
964 let start = Instant::now();
965 let res = Self::get_blobs_v1(self, versioned_hashes);
966 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
967
968 if let Ok(blobs) = &res {
969 let blobs_found = blobs.iter().flatten().count();
970 let blobs_missed = hashes_len - blobs_found;
971
972 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
973 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
974 }
975
976 res
977 }
978
979 fn get_blobs_v2(
980 &self,
981 versioned_hashes: Vec<B256>,
982 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
983 let current_timestamp =
985 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
986 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
987 return Err(EngineApiError::EngineObjectValidationError(
988 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
989 ));
990 }
991
992 if versioned_hashes.len() > MAX_BLOB_LIMIT {
993 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
994 }
995
996 self.inner
997 .tx_pool
998 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
999 .map_err(|err| EngineApiError::Internal(Box::new(err)))
1000 }
1001
1002 fn get_blobs_v3(
1003 &self,
1004 versioned_hashes: Vec<B256>,
1005 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1006 let current_timestamp =
1008 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
1009 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
1010 return Err(EngineApiError::EngineObjectValidationError(
1011 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1012 ));
1013 }
1014
1015 if versioned_hashes.len() > MAX_BLOB_LIMIT {
1016 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1017 }
1018
1019 if (*self.inner.is_syncing)() {
1021 return Ok(None)
1022 }
1023
1024 self.inner
1025 .tx_pool
1026 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
1027 .map(Some)
1028 .map_err(|err| EngineApiError::Internal(Box::new(err)))
1029 }
1030
1031 fn get_blobs_v4(
1032 &self,
1033 versioned_hashes: Vec<B256>,
1034 indices_bitarray: B128,
1035 ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1036 let current_timestamp =
1037 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
1038 if !self.inner.chain_spec.is_amsterdam_active_at_timestamp(current_timestamp) {
1039 return Err(EngineApiError::EngineObjectValidationError(
1040 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1041 ));
1042 }
1043
1044 if versioned_hashes.len() > MAX_BLOB_LIMIT {
1045 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1046 }
1047
1048 if (*self.inner.is_syncing)() {
1050 return Ok(None)
1051 }
1052
1053 self.inner
1054 .tx_pool
1055 .get_blobs_for_versioned_hashes_v4(&versioned_hashes, indices_bitarray)
1056 .map(Some)
1057 .map_err(|err| EngineApiError::Internal(Box::new(err)))
1058 }
1059
1060 pub fn get_blobs_v2_metered(
1062 &self,
1063 versioned_hashes: Vec<B256>,
1064 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
1065 let hashes_len = versioned_hashes.len();
1066 let start = Instant::now();
1067 let res = Self::get_blobs_v2(self, versioned_hashes);
1068 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
1069
1070 if let Ok(blobs) = &res {
1071 let blobs_found = blobs.iter().flatten().count();
1072
1073 self.inner
1074 .metrics
1075 .blob_metrics
1076 .get_blobs_requests_blobs_total
1077 .increment(hashes_len as u64);
1078 self.inner
1079 .metrics
1080 .blob_metrics
1081 .get_blobs_requests_blobs_in_blobpool_total
1082 .increment(blobs_found as u64);
1083
1084 if blobs_found == hashes_len {
1085 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
1086 } else {
1087 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1088 }
1089 } else {
1090 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1091 }
1092
1093 res
1094 }
1095
1096 pub fn get_blobs_v3_metered(
1098 &self,
1099 versioned_hashes: Vec<B256>,
1100 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1101 let hashes_len = versioned_hashes.len();
1102 let start = Instant::now();
1103 let res = Self::get_blobs_v3(self, versioned_hashes);
1104 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
1105
1106 if let Ok(Some(blobs)) = &res {
1107 let blobs_found = blobs.iter().flatten().count();
1108 let blobs_missed = hashes_len - blobs_found;
1109
1110 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1111 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1112 }
1113
1114 res
1115 }
1116
1117 pub fn get_blobs_v4_metered(
1119 &self,
1120 versioned_hashes: Vec<B256>,
1121 indices_bitarray: B128,
1122 ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1123 let hashes_len = versioned_hashes.len();
1124 let start = Instant::now();
1125 let res = Self::get_blobs_v4(self, versioned_hashes, indices_bitarray);
1126 self.inner.metrics.latency.get_blobs_v4.record(start.elapsed());
1127
1128 if let Ok(Some(blobs)) = &res {
1129 let blobs_found = blobs.iter().flatten().count();
1130 let blobs_missed = hashes_len - blobs_found;
1131
1132 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1133 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1134 }
1135
1136 res
1137 }
1138}
1139
1140#[async_trait]
1142impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
1143 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1144where
1145 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
1146 EngineT: EngineTypes<ExecutionData = ExecutionData>,
1147 Pool: TransactionPool + 'static,
1148 Validator: EngineApiValidator<EngineT>,
1149 ChainSpec: EthereumHardforks + Send + Sync + 'static,
1150{
1151 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
1155 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
1156 let payload =
1157 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
1158 Ok(self.new_payload_v1_metered(payload).await?)
1159 }
1160
1161 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
1164 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
1165 let payload = ExecutionData {
1166 payload: payload.into_payload(),
1167 sidecar: ExecutionPayloadSidecar::none(),
1168 };
1169
1170 Ok(self.new_payload_v2_metered(payload).await?)
1171 }
1172
1173 async fn new_payload_v3(
1176 &self,
1177 payload: ExecutionPayloadV3,
1178 versioned_hashes: Vec<B256>,
1179 parent_beacon_block_root: B256,
1180 ) -> RpcResult<PayloadStatus> {
1181 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
1182 let payload = ExecutionData {
1183 payload: payload.into(),
1184 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
1185 versioned_hashes,
1186 parent_beacon_block_root,
1187 }),
1188 };
1189
1190 Ok(self.new_payload_v3_metered(payload).await?)
1191 }
1192
1193 async fn new_payload_v4(
1196 &self,
1197 payload: ExecutionPayloadV3,
1198 versioned_hashes: Vec<B256>,
1199 parent_beacon_block_root: B256,
1200 requests: RequestsOrHash,
1201 ) -> RpcResult<PayloadStatus> {
1202 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1203
1204 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1206 return Err(EngineApiError::UnexpectedRequestsHash.into());
1207 }
1208
1209 let payload = ExecutionData {
1210 payload: payload.into(),
1211 sidecar: ExecutionPayloadSidecar::v4(
1212 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1213 PraguePayloadFields { requests },
1214 ),
1215 };
1216
1217 Ok(self.new_payload_v4_metered(payload).await?)
1218 }
1219
1220 async fn new_payload_v5(
1226 &self,
1227 payload: ExecutionPayloadV4,
1228 versioned_hashes: Vec<B256>,
1229 parent_beacon_block_root: B256,
1230 requests: RequestsOrHash,
1231 ) -> RpcResult<PayloadStatus> {
1232 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1233 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1235 return Err(EngineApiError::UnexpectedRequestsHash.into());
1236 }
1237
1238 let payload = ExecutionData {
1239 payload: payload.into(),
1240 sidecar: ExecutionPayloadSidecar::v4(
1241 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1242 PraguePayloadFields { requests },
1243 ),
1244 };
1245
1246 Ok(self.new_payload_v5_metered(payload).await?)
1247 }
1248
1249 async fn fork_choice_updated_v1(
1254 &self,
1255 fork_choice_state: ForkchoiceState,
1256 payload_attributes: Option<EngineT::PayloadAttributes>,
1257 ) -> RpcResult<ForkchoiceUpdated> {
1258 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1259 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1260 }
1261
1262 async fn fork_choice_updated_v2(
1265 &self,
1266 fork_choice_state: ForkchoiceState,
1267 payload_attributes: Option<EngineT::PayloadAttributes>,
1268 ) -> RpcResult<ForkchoiceUpdated> {
1269 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1270 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1271 }
1272
1273 async fn fork_choice_updated_v3(
1277 &self,
1278 fork_choice_state: ForkchoiceState,
1279 payload_attributes: Option<EngineT::PayloadAttributes>,
1280 ) -> RpcResult<ForkchoiceUpdated> {
1281 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1282 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1283 }
1284
1285 async fn fork_choice_updated_v4(
1289 &self,
1290 fork_choice_state: ForkchoiceState,
1291 payload_attributes: Option<EngineT::PayloadAttributes>,
1292 custody_columns: Option<B128>,
1293 ) -> RpcResult<ForkchoiceUpdated> {
1294 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV4");
1295 Ok(self
1296 .fork_choice_updated_v4_metered(fork_choice_state, payload_attributes, custody_columns)
1297 .await?)
1298 }
1299
1300 async fn get_payload_v1(
1312 &self,
1313 payload_id: PayloadId,
1314 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1315 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1316 Ok(self.get_payload_v1_metered(payload_id).await?)
1317 }
1318
1319 async fn get_payload_v2(
1329 &self,
1330 payload_id: PayloadId,
1331 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1332 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1333 Ok(self.get_payload_v2_metered(payload_id).await?)
1334 }
1335
1336 async fn get_payload_v3(
1346 &self,
1347 payload_id: PayloadId,
1348 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1349 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1350 Ok(self.get_payload_v3_metered(payload_id).await?)
1351 }
1352
1353 async fn get_payload_v4(
1363 &self,
1364 payload_id: PayloadId,
1365 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1366 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1367 Ok(self.get_payload_v4_metered(payload_id).await?)
1368 }
1369
1370 async fn get_payload_v5(
1380 &self,
1381 payload_id: PayloadId,
1382 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1383 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1384 Ok(self.get_payload_v5_metered(payload_id).await?)
1385 }
1386
1387 async fn get_payload_v6(
1393 &self,
1394 payload_id: PayloadId,
1395 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1396 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1397 Ok(self.get_payload_v6_metered(payload_id).await?)
1398 }
1399
1400 async fn get_payload_bodies_by_hash_v1(
1403 &self,
1404 block_hashes: Vec<BlockHash>,
1405 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1406 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1407 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1408 }
1409
1410 async fn get_payload_bodies_by_hash_v2(
1416 &self,
1417 block_hashes: Vec<BlockHash>,
1418 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1419 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1420 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1421 }
1422
1423 async fn get_payload_bodies_by_range_v1(
1440 &self,
1441 start: U64,
1442 count: U64,
1443 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1444 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1445 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1446 }
1447
1448 async fn get_payload_bodies_by_range_v2(
1454 &self,
1455 start: U64,
1456 count: U64,
1457 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1458 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1459 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1460 }
1461
1462 async fn get_client_version_v1(
1466 &self,
1467 client: ClientVersionV1,
1468 ) -> RpcResult<Vec<ClientVersionV1>> {
1469 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1470 Ok(Self::get_client_version_v1(self, client)?)
1471 }
1472
1473 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1476 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1477
1478 let el_caps = self.capabilities();
1479 el_caps.log_capability_mismatches(&capabilities);
1480
1481 Ok(el_caps.list())
1482 }
1483
1484 async fn get_blobs_v1(
1485 &self,
1486 versioned_hashes: Vec<B256>,
1487 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1488 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1489 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1490 }
1491
1492 async fn get_blobs_v2(
1493 &self,
1494 versioned_hashes: Vec<B256>,
1495 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1496 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1497 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1498 }
1499
1500 async fn get_blobs_v3(
1501 &self,
1502 versioned_hashes: Vec<B256>,
1503 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1504 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1505 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1506 }
1507
1508 async fn get_blobs_v4(
1509 &self,
1510 versioned_hashes: Vec<B256>,
1511 indices_bitarray: B128,
1512 ) -> RpcResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1513 trace!(target: "rpc::engine", "Serving engine_getBlobsV4");
1514 Ok(self.get_blobs_v4_metered(versioned_hashes, indices_bitarray)?)
1515 }
1516}
1517
1518impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1519 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1520where
1521 EngineT: EngineTypes,
1522 Self: EngineApiServer<EngineT>,
1523{
1524 fn into_rpc_module(self) -> RpcModule<()> {
1525 EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1526 }
1527}
1528
1529impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1530 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1531where
1532 PayloadT: PayloadTypes,
1533{
1534 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1535 f.debug_struct("EngineApi").finish_non_exhaustive()
1536 }
1537}
1538
1539impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1540 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1541where
1542 PayloadT: PayloadTypes,
1543{
1544 fn clone(&self) -> Self {
1545 Self { inner: Arc::clone(&self.inner) }
1546 }
1547}
1548
1549struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1551 provider: Provider,
1553 chain_spec: Arc<ChainSpec>,
1555 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1557 payload_store: PayloadStore<PayloadT>,
1559 task_spawner: Runtime,
1561 metrics: EngineApiMetrics,
1563 client: ClientVersionV1,
1565 capabilities: EngineCapabilities,
1567 tx_pool: Pool,
1569 validator: Validator,
1571 accept_execution_requests_hash: bool,
1572 cell_custody: CellCustody,
1574 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1576}
1577
1578#[cfg(test)]
1579mod tests {
1580 use super::*;
1581 use alloy_eips::{eip7685::Requests, NumHash};
1582 use alloy_primitives::{Address, Bytes, B256};
1583 use alloy_rpc_types_engine::{
1584 ClientCode, ClientVersionV1, ExecutionPayloadV2, PayloadAttributes, PayloadStatusEnum,
1585 };
1586 use assert_matches::assert_matches;
1587 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1588 use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1589 use reth_ethereum_engine_primitives::EthEngineTypes;
1590 use reth_ethereum_primitives::Block;
1591 use reth_network_api::{
1592 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1593 };
1594 use reth_node_ethereum::EthereumEngineValidator;
1595 use reth_payload_builder::test_utils::spawn_test_payload_service;
1596 use reth_provider::{test_utils::MockEthProvider, BalStoreHandle, InMemoryBalStore, RawBal};
1597 use reth_tasks::Runtime;
1598 use reth_transaction_pool::noop::NoopTransactionPool;
1599 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1600
1601 fn setup_engine_api() -> (
1602 EngineApiTestHandle,
1603 EngineApi<
1604 Arc<MockEthProvider>,
1605 EthEngineTypes,
1606 NoopTransactionPool,
1607 EthereumEngineValidator,
1608 ChainSpec,
1609 >,
1610 ) {
1611 let client = ClientVersionV1 {
1612 code: ClientCode::RH,
1613 name: "Reth".to_string(),
1614 version: "v0.2.0-beta.5".to_string(),
1615 commit: "defa64b2".to_string(),
1616 };
1617
1618 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1619 let provider = Arc::new(MockEthProvider::default());
1620 let payload_store = spawn_test_payload_service();
1621 let (to_engine, engine_rx) = unbounded_channel();
1622 let task_executor = Runtime::test();
1623 let api = EngineApi::new(
1624 provider.clone(),
1625 chain_spec.clone(),
1626 ConsensusEngineHandle::new(to_engine),
1627 payload_store.into(),
1628 NoopTransactionPool::default(),
1629 task_executor,
1630 client,
1631 EngineCapabilities::default(),
1632 EthereumEngineValidator::new(chain_spec.clone()),
1633 false,
1634 NoopNetwork::default(),
1635 );
1636 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1637 (handle, api)
1638 }
1639
1640 #[tokio::test]
1641 async fn engine_client_version_v1() {
1642 let client = ClientVersionV1 {
1643 code: ClientCode::RH,
1644 name: "Reth".to_string(),
1645 version: "v0.2.0-beta.5".to_string(),
1646 commit: "defa64b2".to_string(),
1647 };
1648 let (_, api) = setup_engine_api();
1649 let res = api.get_client_version_v1(client.clone());
1650 assert_eq!(res.unwrap(), vec![client]);
1651 }
1652
1653 #[tokio::test]
1654 async fn get_payload_bodies_by_hash_v2_returns_block_access_list_from_store() {
1655 let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1656 let mut provider = MockEthProvider::default();
1657 provider.bal_store = bal_store.clone();
1658 let provider = Arc::new(provider);
1659
1660 let client = ClientVersionV1 {
1661 code: ClientCode::RH,
1662 name: "Reth".to_string(),
1663 version: "v0.2.0-beta.5".to_string(),
1664 commit: "defa64b2".to_string(),
1665 };
1666 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1667 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1668 let (to_engine, _engine_rx) = unbounded_channel();
1669 let api = EngineApi::new(
1670 provider.clone(),
1671 chain_spec.clone(),
1672 ConsensusEngineHandle::new(to_engine),
1673 payload_store.into(),
1674 NoopTransactionPool::default(),
1675 Runtime::test(),
1676 client,
1677 EngineCapabilities::default(),
1678 EthereumEngineValidator::new(chain_spec),
1679 false,
1680 NoopNetwork::default(),
1681 );
1682
1683 let mut block = Block::default();
1684 block.header.number = 1;
1685 let block_hash = block.header.hash_slow();
1686 provider.add_block(block_hash, block);
1687
1688 let mut block_without_bal = Block::default();
1689 block_without_bal.header.number = 2;
1690 let block_without_bal_hash = block_without_bal.header.hash_slow();
1691 provider.add_block(block_without_bal_hash, block_without_bal);
1692
1693 let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1694 bal_store.insert(NumHash::new(1, block_hash), RawBal::new(raw_bal.clone())).unwrap();
1695
1696 let missing_hash = B256::with_last_byte(3);
1697 let response = api
1698 .get_payload_bodies_by_hash_v2(vec![block_hash, block_without_bal_hash, missing_hash])
1699 .await
1700 .unwrap();
1701
1702 assert_eq!(response.len(), 3);
1703 assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1704 assert_eq!(response[1].as_ref().unwrap().block_access_list, None);
1705 assert!(response[2].is_none());
1706 }
1707
1708 #[tokio::test]
1709 async fn get_payload_bodies_by_range_v2_returns_block_access_lists_from_store() {
1710 let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1711 let mut provider = MockEthProvider::default();
1712 provider.bal_store = bal_store.clone();
1713 let provider = Arc::new(provider);
1714
1715 let client = ClientVersionV1 {
1716 code: ClientCode::RH,
1717 name: "Reth".to_string(),
1718 version: "v0.2.0-beta.5".to_string(),
1719 commit: "defa64b2".to_string(),
1720 };
1721 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1722 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1723 let (to_engine, _engine_rx) = unbounded_channel();
1724 let api = EngineApi::new(
1725 provider.clone(),
1726 chain_spec.clone(),
1727 ConsensusEngineHandle::new(to_engine),
1728 payload_store.into(),
1729 NoopTransactionPool::default(),
1730 Runtime::test(),
1731 client,
1732 EngineCapabilities::default(),
1733 EthereumEngineValidator::new(chain_spec),
1734 false,
1735 NoopNetwork::default(),
1736 );
1737
1738 let mut block = Block::default();
1739 block.header.number = 1;
1740 let block_hash = block.header.hash_slow();
1741 provider.add_block(block_hash, block);
1742
1743 let mut block_without_bal = Block::default();
1744 block_without_bal.header.number = 3;
1745 let block_without_bal_hash = block_without_bal.header.hash_slow();
1746 provider.add_block(block_without_bal_hash, block_without_bal);
1747
1748 let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1749 bal_store.insert(NumHash::new(1, block_hash), RawBal::new(raw_bal.clone())).unwrap();
1750
1751 let response = api.get_payload_bodies_by_range_v2(1, 3).await.unwrap();
1752
1753 assert_eq!(response.len(), 3);
1754 assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1755 assert!(response[1].is_none());
1756 assert_eq!(response[2].as_ref().unwrap().block_access_list, None);
1757 }
1758
1759 struct EngineApiTestHandle {
1760 #[allow(dead_code)]
1761 chain_spec: Arc<ChainSpec>,
1762 provider: Arc<MockEthProvider>,
1763 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1764 }
1765
1766 #[tokio::test]
1767 async fn forwards_responses_to_consensus_engine() {
1768 let (mut handle, api) = setup_engine_api();
1769
1770 tokio::spawn(async move {
1771 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1772 let execution_data = ExecutionData {
1773 payload: payload_v1.into(),
1774 sidecar: ExecutionPayloadSidecar::none(),
1775 };
1776
1777 api.new_payload_v1(execution_data).await.unwrap();
1778 });
1779 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1780 }
1781
1782 #[tokio::test]
1783 async fn new_payload_v5_accepts_amsterdam_payloads() {
1784 let chain_spec = Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1785 let provider = Arc::new(MockEthProvider::default());
1786 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1787 let (to_engine, mut engine_rx) = unbounded_channel();
1788
1789 let api = EngineApi::new(
1790 provider,
1791 chain_spec.clone(),
1792 ConsensusEngineHandle::new(to_engine),
1793 payload_store.into(),
1794 NoopTransactionPool::default(),
1795 Runtime::test(),
1796 ClientVersionV1 {
1797 code: ClientCode::RH,
1798 name: "Reth".to_string(),
1799 version: "v0.0.0-test".to_string(),
1800 commit: "test".to_string(),
1801 },
1802 EngineCapabilities::default(),
1803 EthereumEngineValidator::new(chain_spec),
1804 false,
1805 NoopNetwork::default(),
1806 );
1807
1808 tokio::spawn(async move {
1809 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1810 let payload = ExecutionPayloadV4 {
1811 payload_inner: ExecutionPayloadV3 {
1812 payload_inner: ExecutionPayloadV2 {
1813 payload_inner: payload_v1,
1814 withdrawals: Vec::new(),
1815 },
1816 blob_gas_used: 0,
1817 excess_blob_gas: 0,
1818 },
1819 block_access_list: Bytes::from_static(b"bal"),
1820 slot_number: 1,
1821 };
1822 let execution_data = ExecutionData {
1823 payload: payload.into(),
1824 sidecar: ExecutionPayloadSidecar::v4(
1825 CancunPayloadFields {
1826 versioned_hashes: Vec::new(),
1827 parent_beacon_block_root: B256::ZERO,
1828 },
1829 PraguePayloadFields { requests: RequestsOrHash::Requests(Requests::default()) },
1830 ),
1831 };
1832
1833 api.new_payload_v5(execution_data).await.unwrap();
1834 });
1835
1836 assert_matches!(engine_rx.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1837 }
1838
1839 #[derive(Clone)]
1840 struct TestNetworkInfo {
1841 syncing: bool,
1842 }
1843
1844 impl NetworkInfo for TestNetworkInfo {
1845 fn local_addr(&self) -> std::net::SocketAddr {
1846 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1847 }
1848
1849 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1850 #[allow(deprecated)]
1851 Ok(NetworkStatus {
1852 client_version: "test".to_string(),
1853 protocol_version: 5,
1854 eth_protocol_info: EthProtocolInfo {
1855 network: 1,
1856 difficulty: None,
1857 genesis: Default::default(),
1858 config: Default::default(),
1859 head: Default::default(),
1860 },
1861 capabilities: vec![],
1862 })
1863 }
1864
1865 fn chain_id(&self) -> u64 {
1866 1
1867 }
1868
1869 fn cell_custody(&self) -> &CellCustody {
1870 static CELL_CUSTODY: std::sync::OnceLock<CellCustody> = std::sync::OnceLock::new();
1871 CELL_CUSTODY.get_or_init(CellCustody::default)
1872 }
1873
1874 fn is_syncing(&self) -> bool {
1875 self.syncing
1876 }
1877
1878 fn is_initially_syncing(&self) -> bool {
1879 self.syncing
1880 }
1881 }
1882
1883 #[tokio::test]
1884 async fn get_blobs_v3_returns_null_when_syncing() {
1885 let chain_spec: Arc<ChainSpec> =
1886 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1887 let provider = Arc::new(MockEthProvider::default());
1888 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1889 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1890
1891 let api = EngineApi::new(
1892 provider,
1893 chain_spec.clone(),
1894 ConsensusEngineHandle::new(to_engine),
1895 payload_store.into(),
1896 NoopTransactionPool::default(),
1897 Runtime::test(),
1898 ClientVersionV1 {
1899 code: ClientCode::RH,
1900 name: "Reth".to_string(),
1901 version: "v0.0.0-test".to_string(),
1902 commit: "test".to_string(),
1903 },
1904 EngineCapabilities::default(),
1905 EthereumEngineValidator::new(chain_spec),
1906 false,
1907 TestNetworkInfo { syncing: true },
1908 );
1909
1910 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1911 assert_matches!(res, Ok(None));
1912 }
1913
1914 #[tokio::test]
1915 async fn get_blobs_v4_returns_null_when_syncing() {
1916 let chain_spec: Arc<ChainSpec> =
1917 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1918 let provider = Arc::new(MockEthProvider::default());
1919 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1920 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1921
1922 let api = EngineApi::new(
1923 provider,
1924 chain_spec.clone(),
1925 ConsensusEngineHandle::new(to_engine),
1926 payload_store.into(),
1927 NoopTransactionPool::default(),
1928 Runtime::test(),
1929 ClientVersionV1 {
1930 code: ClientCode::RH,
1931 name: "Reth".to_string(),
1932 version: "v0.0.0-test".to_string(),
1933 commit: "test".to_string(),
1934 },
1935 EngineCapabilities::default(),
1936 EthereumEngineValidator::new(chain_spec),
1937 false,
1938 TestNetworkInfo { syncing: true },
1939 );
1940
1941 let res = api.get_blobs_v4_metered(vec![B256::ZERO], B128::from(1u128));
1942 assert_matches!(res, Ok(None));
1943 }
1944
1945 #[tokio::test]
1946 async fn fcu_v4_updates_shared_cell_custody_before_forkchoice_result() {
1947 let chain_spec: Arc<ChainSpec> =
1948 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1949 let provider = Arc::new(MockEthProvider::default());
1950 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1951 let (to_engine, mut engine_rx) = unbounded_channel();
1952 let network = NoopNetwork::default();
1953 let cell_custody = network.cell_custody().clone();
1954
1955 let api = EngineApi::new(
1956 provider,
1957 chain_spec.clone(),
1958 ConsensusEngineHandle::new(to_engine),
1959 payload_store.into(),
1960 NoopTransactionPool::default(),
1961 Runtime::test(),
1962 ClientVersionV1 {
1963 code: ClientCode::RH,
1964 name: "Reth".to_string(),
1965 version: "v0.0.0-test".to_string(),
1966 commit: "test".to_string(),
1967 },
1968 EngineCapabilities::default(),
1969 EthereumEngineValidator::new(chain_spec),
1970 false,
1971 network,
1972 );
1973
1974 let state = ForkchoiceState {
1975 head_block_hash: B256::from([0x33; 32]),
1976 safe_block_hash: B256::ZERO,
1977 finalized_block_hash: B256::ZERO,
1978 };
1979 let custody_columns = B128::from(0b1010u128);
1980
1981 let api_task = tokio::spawn(async move {
1982 api.fork_choice_updated_v4(state, None, Some(custody_columns)).await
1983 });
1984
1985 let request = tokio::time::timeout(std::time::Duration::from_secs(1), engine_rx.recv())
1986 .await
1987 .expect("timed out waiting for forkchoiceUpdated request")
1988 .expect("expected forkchoiceUpdated request");
1989 let response_tx = match request {
1990 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1991 assert!(payload_attrs.is_none());
1992 tx
1993 }
1994 other => panic!("unexpected engine message: {other:?}"),
1995 };
1996 assert_eq!(cell_custody.get(), custody_columns);
1997
1998 response_tx
1999 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2000 PayloadStatusEnum::Valid,
2001 ))))
2002 .expect("send valid response");
2003
2004 api_task
2005 .await
2006 .expect("api task should not panic")
2007 .expect("forkchoiceUpdatedV4 should succeed");
2008 assert_eq!(cell_custody.get(), custody_columns);
2009 }
2010
2011 #[tokio::test]
2012 async fn fcu_v4_updates_shared_cell_custody_when_payload_attrs_invalid() {
2013 let chain_spec: Arc<ChainSpec> =
2014 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
2015 let provider = Arc::new(MockEthProvider::default());
2016 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
2017 let (to_engine, mut engine_rx) = unbounded_channel();
2018 let network = NoopNetwork::default();
2019 let cell_custody = network.cell_custody().clone();
2020
2021 let api = EngineApi::new(
2022 provider,
2023 chain_spec.clone(),
2024 ConsensusEngineHandle::new(to_engine),
2025 payload_store.into(),
2026 NoopTransactionPool::default(),
2027 Runtime::test(),
2028 ClientVersionV1 {
2029 code: ClientCode::RH,
2030 name: "Reth".to_string(),
2031 version: "v0.0.0-test".to_string(),
2032 commit: "test".to_string(),
2033 },
2034 EngineCapabilities::default(),
2035 EthereumEngineValidator::new(chain_spec),
2036 false,
2037 network,
2038 );
2039
2040 let state = ForkchoiceState {
2041 head_block_hash: B256::from([0x44; 32]),
2042 safe_block_hash: B256::ZERO,
2043 finalized_block_hash: B256::ZERO,
2044 };
2045 let payload_attributes = PayloadAttributes {
2046 timestamp: 1,
2047 prev_randao: B256::ZERO,
2048 suggested_fee_recipient: Address::ZERO,
2049 withdrawals: Some(vec![]),
2050 parent_beacon_block_root: None,
2051 slot_number: None,
2052 target_gas_limit: None,
2053 };
2054 let custody_columns = B128::from(0b1010u128);
2055
2056 let api_task = tokio::spawn(async move {
2057 api.fork_choice_updated_v4(state, Some(payload_attributes), Some(custody_columns)).await
2058 });
2059
2060 let request = tokio::time::timeout(std::time::Duration::from_secs(1), engine_rx.recv())
2061 .await
2062 .expect("timed out waiting for forkchoiceUpdated request")
2063 .expect("expected forkchoiceUpdated request");
2064 let response_tx = match request {
2065 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2066 assert!(
2067 payload_attrs.is_none(),
2068 "when attrs are invalid, API should first evaluate forkchoice without attrs"
2069 );
2070 tx
2071 }
2072 other => panic!("unexpected engine message: {other:?}"),
2073 };
2074 assert_eq!(cell_custody.get(), custody_columns);
2075
2076 response_tx
2077 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2078 PayloadStatusEnum::Valid,
2079 ))))
2080 .expect("send valid response");
2081
2082 let response = api_task.await.expect("api task should not panic");
2083 assert_matches!(
2084 response,
2085 Err(EngineApiError::EngineObjectValidationError(
2086 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
2087 ))
2088 );
2089 }
2090
2091 #[tokio::test]
2092 async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
2093 let (mut handle, api) = setup_engine_api();
2094
2095 let state = ForkchoiceState {
2096 head_block_hash: B256::from([0x11; 32]),
2097 safe_block_hash: B256::ZERO,
2098 finalized_block_hash: B256::ZERO,
2099 };
2100 let payload_attributes = PayloadAttributes {
2101 timestamp: 1,
2102 prev_randao: B256::ZERO,
2103 suggested_fee_recipient: Address::ZERO,
2104 withdrawals: Some(vec![]),
2105 parent_beacon_block_root: None,
2107 slot_number: None,
2108 target_gas_limit: None,
2109 };
2110
2111 let api_task = tokio::spawn(async move {
2112 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
2113 });
2114
2115 let request =
2116 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
2117 .await
2118 .expect("timed out waiting for forkchoiceUpdated request")
2119 .expect("expected forkchoiceUpdated request");
2120 let response_tx = match request {
2121 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2122 assert!(
2123 payload_attrs.is_none(),
2124 "FCU for syncing state should be evaluated before payload attributes"
2125 );
2126 tx
2127 }
2128 other => panic!("unexpected engine message: {other:?}"),
2129 };
2130
2131 response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
2132
2133 let response = api_task
2134 .await
2135 .expect("api task should not panic")
2136 .expect("forkchoiceUpdatedV3 should return a syncing response");
2137 assert!(response.payload_status.is_syncing());
2138 assert!(response.payload_id.is_none());
2139 }
2140
2141 #[tokio::test]
2142 async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
2143 let (mut handle, api) = setup_engine_api();
2144
2145 let state = ForkchoiceState {
2146 head_block_hash: B256::from([0x22; 32]),
2147 safe_block_hash: B256::ZERO,
2148 finalized_block_hash: B256::ZERO,
2149 };
2150 let payload_attributes = PayloadAttributes {
2151 timestamp: 1,
2152 prev_randao: B256::ZERO,
2153 suggested_fee_recipient: Address::ZERO,
2154 withdrawals: Some(vec![]),
2155 parent_beacon_block_root: None,
2156 slot_number: None,
2157 target_gas_limit: None,
2158 };
2159
2160 let api_task = tokio::spawn(async move {
2161 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
2162 });
2163
2164 let request =
2165 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
2166 .await
2167 .expect("timed out waiting for forkchoiceUpdated request")
2168 .expect("expected forkchoiceUpdated request");
2169
2170 let response_tx = match request {
2171 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2172 assert!(
2173 payload_attrs.is_none(),
2174 "when attrs are invalid, API should first evaluate forkchoice without attrs"
2175 );
2176 tx
2177 }
2178 other => panic!("unexpected engine message: {other:?}"),
2179 };
2180
2181 response_tx
2182 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2183 PayloadStatusEnum::Valid,
2184 ))))
2185 .expect("send valid response");
2186
2187 let response = api_task.await.expect("api task should not panic");
2188 assert_matches!(
2189 response,
2190 Err(EngineApiError::EngineObjectValidationError(
2191 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
2192 ))
2193 );
2194
2195 match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
2196 .await
2197 {
2198 Err(_) | Ok(None) => {}
2199 Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
2200 panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
2201 }
2202 Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
2203 }
2204 }
2205
2206 mod get_payload_bodies {
2208 use super::*;
2209 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
2210 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
2211
2212 #[tokio::test]
2213 async fn invalid_params() {
2214 let (_, api) = setup_engine_api();
2215
2216 let by_range_tests = [
2217 (0, 0),
2219 (0, 1),
2220 (1, 0),
2221 ];
2222
2223 for (start, count) in by_range_tests {
2225 let res = api.get_payload_bodies_by_range_v1(start, count).await;
2226 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
2227 }
2228 }
2229
2230 #[tokio::test]
2231 async fn request_too_large() {
2232 let (_, api) = setup_engine_api();
2233
2234 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
2235 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
2236 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
2237 }
2238
2239 #[tokio::test]
2240 async fn returns_payload_bodies() {
2241 let mut rng = generators::rng();
2242 let (handle, api) = setup_engine_api();
2243
2244 let (start, count) = (1, 10);
2245 let blocks = random_block_range(
2246 &mut rng,
2247 start..=start + count - 1,
2248 BlockRangeParams { tx_count: 0..2, ..Default::default() },
2249 );
2250 handle
2251 .provider
2252 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
2253
2254 let expected = blocks
2255 .iter()
2256 .cloned()
2257 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
2258 .collect::<Vec<_>>();
2259
2260 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2261 assert_eq!(res, expected);
2262 }
2263
2264 #[tokio::test]
2265 async fn returns_payload_bodies_with_gaps() {
2266 let mut rng = generators::rng();
2267 let (handle, api) = setup_engine_api();
2268
2269 let (start, count) = (1, 100);
2270 let blocks = random_block_range(
2271 &mut rng,
2272 start..=start + count - 1,
2273 BlockRangeParams { tx_count: 0..2, ..Default::default() },
2274 );
2275
2276 let first_missing_range = 26..=50;
2278 let second_missing_range = 76..=100;
2279 handle.provider.extend_blocks(
2280 blocks
2281 .iter()
2282 .filter(|b| {
2283 !first_missing_range.contains(&b.number) &&
2284 !second_missing_range.contains(&b.number)
2285 })
2286 .map(|b| (b.hash(), b.clone().into_block())),
2287 );
2288
2289 let expected = blocks
2290 .iter()
2291 .filter(|b| !second_missing_range.contains(&b.number))
2294 .cloned()
2295 .map(|b| {
2296 if first_missing_range.contains(&b.number) {
2297 None
2298 } else {
2299 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2300 }
2301 })
2302 .collect::<Vec<_>>();
2303
2304 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2305 assert_eq!(res, expected);
2306
2307 let expected = blocks
2308 .iter()
2309 .cloned()
2310 .map(|b| {
2313 if first_missing_range.contains(&b.number) ||
2314 second_missing_range.contains(&b.number)
2315 {
2316 None
2317 } else {
2318 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2319 }
2320 })
2321 .collect::<Vec<_>>();
2322
2323 let hashes = blocks.iter().map(|b| b.hash()).collect();
2324 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
2325 assert_eq!(res, expected);
2326 }
2327 }
2328}