1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B128, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::{CellCustody, NetworkInfo};
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BalProvider, BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
81 PayloadT: PayloadTypes,
82 Pool: TransactionPool + 'static,
83 Validator: EngineApiValidator<PayloadT>,
84 ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86 #[expect(clippy::too_many_arguments)]
88 pub fn new(
89 provider: Provider,
90 chain_spec: Arc<ChainSpec>,
91 beacon_consensus: ConsensusEngineHandle<PayloadT>,
92 payload_store: PayloadStore<PayloadT>,
93 tx_pool: Pool,
94 task_spawner: Runtime,
95 client: ClientVersionV1,
96 capabilities: EngineCapabilities,
97 validator: Validator,
98 accept_execution_requests_hash: bool,
99 network: impl NetworkInfo + 'static,
100 ) -> Self {
101 let cell_custody = network.cell_custody().clone();
102 let is_syncing = Arc::new(move || network.is_syncing());
103 let inner = Arc::new(EngineApiInner {
104 provider,
105 chain_spec,
106 beacon_consensus,
107 payload_store,
108 task_spawner,
109 metrics: EngineApiMetrics::default(),
110 client,
111 capabilities,
112 tx_pool,
113 validator,
114 accept_execution_requests_hash,
115 cell_custody,
116 is_syncing,
117 });
118 Self { inner }
119 }
120
121 pub fn get_client_version_v1(
123 &self,
124 _client: ClientVersionV1,
125 ) -> EngineApiResult<Vec<ClientVersionV1>> {
126 Ok(vec![self.inner.client.clone()])
127 }
128
129 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
131 Ok(self
132 .inner
133 .payload_store
134 .payload_timestamp(payload_id)
135 .await
136 .ok_or(EngineApiError::UnknownPayload)??)
137 }
138
139 pub async fn new_payload_v1(
142 &self,
143 payload: PayloadT::ExecutionData,
144 ) -> EngineApiResult<PayloadStatus> {
145 let payload_or_attrs = PayloadOrAttributes::<
146 '_,
147 PayloadT::ExecutionData,
148 PayloadT::PayloadAttributes,
149 >::from_execution_payload(&payload);
150
151 self.inner
152 .validator
153 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
154
155 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
156 }
157
158 pub async fn new_payload_v1_metered(
160 &self,
161 payload: PayloadT::ExecutionData,
162 ) -> EngineApiResult<PayloadStatus> {
163 let start = Instant::now();
164 let res = Self::new_payload_v1(self, payload).await;
165 let elapsed = start.elapsed();
166 self.inner.metrics.latency.new_payload_v1.record(elapsed);
167 res
168 }
169
170 pub async fn new_payload_v2(
172 &self,
173 payload: PayloadT::ExecutionData,
174 ) -> EngineApiResult<PayloadStatus> {
175 let payload_or_attrs = PayloadOrAttributes::<
176 '_,
177 PayloadT::ExecutionData,
178 PayloadT::PayloadAttributes,
179 >::from_execution_payload(&payload);
180 self.inner
181 .validator
182 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
183 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
184 }
185
186 pub async fn new_payload_v2_metered(
188 &self,
189 payload: PayloadT::ExecutionData,
190 ) -> EngineApiResult<PayloadStatus> {
191 let start = Instant::now();
192 let res = Self::new_payload_v2(self, payload).await;
193 let elapsed = start.elapsed();
194 self.inner.metrics.latency.new_payload_v2.record(elapsed);
195 res
196 }
197
198 pub async fn new_payload_v3(
200 &self,
201 payload: PayloadT::ExecutionData,
202 ) -> EngineApiResult<PayloadStatus> {
203 let payload_or_attrs = PayloadOrAttributes::<
204 '_,
205 PayloadT::ExecutionData,
206 PayloadT::PayloadAttributes,
207 >::from_execution_payload(&payload);
208 self.inner
209 .validator
210 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
211
212 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
213 }
214
215 pub async fn new_payload_v3_metered(
217 &self,
218 payload: PayloadT::ExecutionData,
219 ) -> RpcResult<PayloadStatus> {
220 let start = Instant::now();
221
222 let res = Self::new_payload_v3(self, payload).await;
223 let elapsed = start.elapsed();
224 self.inner.metrics.latency.new_payload_v3.record(elapsed);
225 Ok(res?)
226 }
227
228 pub async fn new_payload_v4(
230 &self,
231 payload: PayloadT::ExecutionData,
232 ) -> EngineApiResult<PayloadStatus> {
233 let payload_or_attrs = PayloadOrAttributes::<
234 '_,
235 PayloadT::ExecutionData,
236 PayloadT::PayloadAttributes,
237 >::from_execution_payload(&payload);
238 self.inner
239 .validator
240 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
241
242 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
243 }
244
245 pub async fn new_payload_v4_metered(
247 &self,
248 payload: PayloadT::ExecutionData,
249 ) -> RpcResult<PayloadStatus> {
250 let start = Instant::now();
251 let res = Self::new_payload_v4(self, payload).await;
252
253 let elapsed = start.elapsed();
254 self.inner.metrics.latency.new_payload_v4.record(elapsed);
255 Ok(res?)
256 }
257
258 pub async fn new_payload_v5(
264 &self,
265 payload: PayloadT::ExecutionData,
266 ) -> EngineApiResult<PayloadStatus> {
267 let payload_or_attrs = PayloadOrAttributes::<
268 '_,
269 PayloadT::ExecutionData,
270 PayloadT::PayloadAttributes,
271 >::from_execution_payload(&payload);
272 self.inner
273 .validator
274 .validate_version_specific_fields(EngineApiMessageVersion::V5, payload_or_attrs)?;
275 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
276 }
277
278 pub async fn new_payload_v5_metered(
280 &self,
281 payload: PayloadT::ExecutionData,
282 ) -> RpcResult<PayloadStatus> {
283 let start = Instant::now();
284 let res = Self::new_payload_v5(self, payload).await;
285 let elapsed = start.elapsed();
286 self.inner.metrics.latency.new_payload_v5.record(elapsed);
287 Ok(res?)
288 }
289
290 pub fn accept_execution_requests_hash(&self) -> bool {
292 self.inner.accept_execution_requests_hash
293 }
294}
295
296impl<Provider, EngineT, Pool, Validator, ChainSpec>
297 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
298where
299 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
300 EngineT: EngineTypes,
301 Pool: TransactionPool + 'static,
302 Validator: EngineApiValidator<EngineT>,
303 ChainSpec: EthereumHardforks + Send + Sync + 'static,
304{
305 pub async fn fork_choice_updated_v1(
312 &self,
313 state: ForkchoiceState,
314 payload_attrs: Option<EngineT::PayloadAttributes>,
315 ) -> EngineApiResult<ForkchoiceUpdated> {
316 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
317 .await
318 }
319
320 pub async fn fork_choice_updated_v1_metered(
322 &self,
323 state: ForkchoiceState,
324 payload_attrs: Option<EngineT::PayloadAttributes>,
325 ) -> EngineApiResult<ForkchoiceUpdated> {
326 let start = Instant::now();
327 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
328 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
329 res
330 }
331
332 pub async fn fork_choice_updated_v2(
337 &self,
338 state: ForkchoiceState,
339 payload_attrs: Option<EngineT::PayloadAttributes>,
340 ) -> EngineApiResult<ForkchoiceUpdated> {
341 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
342 .await
343 }
344
345 pub async fn fork_choice_updated_v2_metered(
347 &self,
348 state: ForkchoiceState,
349 payload_attrs: Option<EngineT::PayloadAttributes>,
350 ) -> EngineApiResult<ForkchoiceUpdated> {
351 let start = Instant::now();
352 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
353 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
354 res
355 }
356
357 pub async fn fork_choice_updated_v3(
362 &self,
363 state: ForkchoiceState,
364 payload_attrs: Option<EngineT::PayloadAttributes>,
365 ) -> EngineApiResult<ForkchoiceUpdated> {
366 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
367 .await
368 }
369
370 pub async fn fork_choice_updated_v3_metered(
372 &self,
373 state: ForkchoiceState,
374 payload_attrs: Option<EngineT::PayloadAttributes>,
375 ) -> EngineApiResult<ForkchoiceUpdated> {
376 let start = Instant::now();
377 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
378 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
379 res
380 }
381
382 pub async fn fork_choice_updated_v4(
387 &self,
388 state: ForkchoiceState,
389 payload_attrs: Option<EngineT::PayloadAttributes>,
390 custody_columns: Option<B128>,
391 ) -> EngineApiResult<ForkchoiceUpdated> {
392 if let Some(custody_columns) = custody_columns {
393 self.inner.cell_custody.set(custody_columns);
394 }
395 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V4, state, payload_attrs)
396 .await
397 }
398
399 pub async fn fork_choice_updated_v4_metered(
401 &self,
402 state: ForkchoiceState,
403 payload_attrs: Option<EngineT::PayloadAttributes>,
404 custody_columns: Option<B128>,
405 ) -> EngineApiResult<ForkchoiceUpdated> {
406 let start = Instant::now();
407 let res = Self::fork_choice_updated_v4(self, state, payload_attrs, custody_columns).await;
408 self.inner.metrics.latency.fork_choice_updated_v4.record(start.elapsed());
409 res
410 }
411
412 async fn get_built_payload(
414 &self,
415 payload_id: PayloadId,
416 ) -> EngineApiResult<EngineT::BuiltPayload> {
417 self.inner
418 .payload_store
419 .resolve(payload_id)
420 .await
421 .ok_or(EngineApiError::UnknownPayload)?
422 .map_err(|_| EngineApiError::UnknownPayload)
423 }
424
425 async fn get_payload_inner<R>(
428 &self,
429 payload_id: PayloadId,
430 version: EngineApiMessageVersion,
431 ) -> EngineApiResult<R>
432 where
433 EngineT::BuiltPayload: TryInto<R>,
434 {
435 let timestamp = self.get_payload_timestamp(payload_id).await?;
438 validate_payload_timestamp(
439 &self.inner.chain_spec,
440 version,
441 timestamp,
442 MessageValidationKind::GetPayload,
443 )?;
444
445 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
447 warn!(?version, "could not transform built payload");
448 EngineApiError::UnknownPayload
449 })
450 }
451
452 pub async fn get_payload_v1(
462 &self,
463 payload_id: PayloadId,
464 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
465 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
466 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
467 EngineApiError::UnknownPayload
468 })
469 }
470
471 pub async fn get_payload_v1_metered(
473 &self,
474 payload_id: PayloadId,
475 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
476 let start = Instant::now();
477 let res = Self::get_payload_v1(self, payload_id).await;
478 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
479 res
480 }
481
482 pub async fn get_payload_v2(
490 &self,
491 payload_id: PayloadId,
492 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
493 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
494 }
495
496 pub async fn get_payload_v2_metered(
498 &self,
499 payload_id: PayloadId,
500 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
501 let start = Instant::now();
502 let res = Self::get_payload_v2(self, payload_id).await;
503 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
504 res
505 }
506
507 pub async fn get_payload_v3(
515 &self,
516 payload_id: PayloadId,
517 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
518 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
519 }
520
521 pub async fn get_payload_v3_metered(
523 &self,
524 payload_id: PayloadId,
525 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
526 let start = Instant::now();
527 let res = Self::get_payload_v3(self, payload_id).await;
528 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
529 res
530 }
531
532 pub async fn get_payload_v4(
540 &self,
541 payload_id: PayloadId,
542 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
543 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
544 }
545
546 pub async fn get_payload_v4_metered(
548 &self,
549 payload_id: PayloadId,
550 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
551 let start = Instant::now();
552 let res = Self::get_payload_v4(self, payload_id).await;
553 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
554 res
555 }
556
557 pub async fn get_payload_v5(
567 &self,
568 payload_id: PayloadId,
569 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
570 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
571 }
572
573 pub async fn get_payload_v5_metered(
575 &self,
576 payload_id: PayloadId,
577 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
578 let start = Instant::now();
579 let res = Self::get_payload_v5(self, payload_id).await;
580 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
581 res
582 }
583
584 pub async fn get_payload_v6(
590 &self,
591 payload_id: PayloadId,
592 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
593 self.get_payload_inner(payload_id, EngineApiMessageVersion::V6).await
594 }
595
596 pub async fn get_payload_v6_metered(
598 &self,
599 payload_id: PayloadId,
600 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
601 let start = Instant::now();
602 let res = Self::get_payload_v6(self, payload_id).await;
603 self.inner.metrics.latency.get_payload_v6.record(start.elapsed());
604 res
605 }
606
607 pub async fn get_payload_bodies_by_range_with<F, R>(
610 &self,
611 start: BlockNumber,
612 count: u64,
613 f: F,
614 ) -> EngineApiResult<Vec<Option<R>>>
615 where
616 F: Fn(Provider::Block) -> R + Send + 'static,
617 R: Send + 'static,
618 {
619 let (tx, rx) = oneshot::channel();
620 let inner = self.inner.clone();
621
622 self.inner.task_spawner.spawn_blocking_task(async move {
623 if count > MAX_PAYLOAD_BODIES_LIMIT {
624 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
625 return;
626 }
627
628 if start == 0 || count == 0 {
629 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
630 return;
631 }
632
633 let mut result = Vec::with_capacity(count as usize);
634
635 let mut end = start.saturating_add(count - 1);
637
638 if let Ok(best_block) = inner.provider.best_block_number()
641 && end > best_block {
642 end = best_block;
643 }
644
645 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
647 for num in start..=end {
648 if num < earliest_block {
649 result.push(None);
650 continue;
651 }
652 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
653 match block_result {
654 Ok(block) => {
655 result.push(block.map(&f));
656 }
657 Err(err) => {
658 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
659 return;
660 }
661 };
662 }
663 tx.send(Ok(result)).ok();
664 });
665
666 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
667 }
668
669 pub async fn get_payload_bodies_by_range_v1(
680 &self,
681 start: BlockNumber,
682 count: u64,
683 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
684 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
685 transactions: block.body().encoded_2718_transactions(),
686 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
687 })
688 .await
689 }
690
691 pub async fn get_payload_bodies_by_range_v1_metered(
693 &self,
694 start: BlockNumber,
695 count: u64,
696 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
697 let start_time = Instant::now();
698 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
699 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
700 res
701 }
702
703 pub async fn get_payload_bodies_by_range_v2(
707 &self,
708 start: BlockNumber,
709 count: u64,
710 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
711 let mut payload_bodies = self
712 .get_payload_bodies_by_range_with(start, count, |block| {
713 let block_hash = block.header().hash_slow();
714 (
715 block_hash,
716 ExecutionPayloadBodyV2 {
717 transactions: block.body().encoded_2718_transactions(),
718 withdrawals: block
719 .body()
720 .withdrawals()
721 .cloned()
722 .map(Withdrawals::into_inner),
723 block_access_list: None,
724 },
725 )
726 })
727 .await?;
728
729 let block_hashes = payload_bodies
730 .iter()
731 .filter_map(|payload_body| payload_body.as_ref().map(|(block_hash, _)| *block_hash))
732 .collect::<Vec<_>>();
733 let block_access_lists = self.get_block_access_lists_by_hashes(block_hashes).await?;
734
735 for (payload_body, block_access_list) in
736 payload_bodies.iter_mut().filter_map(Option::as_mut).zip(block_access_lists)
737 {
738 payload_body.1.block_access_list = block_access_list;
739 }
740
741 Ok(payload_bodies
742 .into_iter()
743 .map(|payload_body| payload_body.map(|(_, payload_body)| payload_body))
744 .collect())
745 }
746
747 pub async fn get_payload_bodies_by_range_v2_metered(
749 &self,
750 start: BlockNumber,
751 count: u64,
752 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
753 let start_time = Instant::now();
754 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
755 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
756 res
757 }
758
759 pub async fn get_payload_bodies_by_hash_with<F, R>(
761 &self,
762 hashes: Vec<BlockHash>,
763 f: F,
764 ) -> EngineApiResult<Vec<Option<R>>>
765 where
766 F: Fn(Provider::Block) -> R + Send + 'static,
767 R: Send + 'static,
768 {
769 let len = hashes.len() as u64;
770 if len > MAX_PAYLOAD_BODIES_LIMIT {
771 return Err(EngineApiError::PayloadRequestTooLarge { len });
772 }
773
774 let (tx, rx) = oneshot::channel();
775 let inner = self.inner.clone();
776
777 self.inner.task_spawner.spawn_blocking_task(async move {
778 let mut result = Vec::with_capacity(hashes.len());
779 for hash in hashes {
780 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
781 match block_result {
782 Ok(block) => {
783 result.push(block.map(&f));
784 }
785 Err(err) => {
786 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
787 return;
788 }
789 }
790 }
791 tx.send(Ok(result)).ok();
792 });
793
794 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
795 }
796
797 async fn get_block_access_lists_by_hashes(
798 &self,
799 hashes: Vec<BlockHash>,
800 ) -> EngineApiResult<Vec<Option<Bytes>>> {
801 let len = hashes.len() as u64;
802 if len > MAX_PAYLOAD_BODIES_LIMIT {
803 return Err(EngineApiError::PayloadRequestTooLarge { len });
804 }
805
806 let (tx, rx) = oneshot::channel();
807 let bal_store = self.inner.provider.bal_store().clone();
808
809 self.inner.task_spawner.spawn_blocking_task(async move {
810 tx.send(
811 bal_store
812 .get_by_hashes(&hashes)
813 .map_err(|err| EngineApiError::Internal(Box::new(err))),
814 )
815 .ok();
816 });
817
818 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
819 }
820
821 pub async fn get_payload_bodies_by_hash_v1(
823 &self,
824 hashes: Vec<BlockHash>,
825 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
826 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
827 transactions: block.body().encoded_2718_transactions(),
828 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
829 })
830 .await
831 }
832
833 pub async fn get_payload_bodies_by_hash_v1_metered(
835 &self,
836 hashes: Vec<BlockHash>,
837 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
838 let start = Instant::now();
839 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
840 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
841 res
842 }
843
844 pub async fn get_payload_bodies_by_hash_v2(
848 &self,
849 hashes: Vec<BlockHash>,
850 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
851 let payload_bodies =
852 self.get_payload_bodies_by_hash_with(hashes.clone(), |block| ExecutionPayloadBodyV2 {
853 transactions: block.body().encoded_2718_transactions(),
854 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
855 block_access_list: None,
856 });
857 let block_access_lists = self.get_block_access_lists_by_hashes(hashes);
858 let (mut payload_bodies, block_access_lists) =
859 tokio::try_join!(payload_bodies, block_access_lists)?;
860
861 for (payload_body, block_access_list) in payload_bodies.iter_mut().zip(block_access_lists) {
862 if let Some(payload_body) = payload_body {
863 payload_body.block_access_list = block_access_list;
864 }
865 }
866
867 Ok(payload_bodies)
868 }
869
870 pub async fn get_payload_bodies_by_hash_v2_metered(
872 &self,
873 hashes: Vec<BlockHash>,
874 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
875 let start = Instant::now();
876 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
877 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
878 res
879 }
880
881 async fn validate_and_execute_forkchoice(
895 &self,
896 version: EngineApiMessageVersion,
897 state: ForkchoiceState,
898 payload_attrs: Option<EngineT::PayloadAttributes>,
899 ) -> EngineApiResult<ForkchoiceUpdated> {
900 if let Some(ref attrs) = payload_attrs {
901 let attr_validation_res =
902 self.inner.validator.ensure_well_formed_attributes(version, attrs);
903
904 if let Err(err) = attr_validation_res {
914 let fcu_res = self.inner.beacon_consensus.fork_choice_updated(state, None).await?;
915 if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
916 return Ok(fcu_res)
917 }
918 return Err(err.into())
919 }
920 }
921
922 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
923 }
924
925 pub fn capabilities(&self) -> &EngineCapabilities {
927 &self.inner.capabilities
928 }
929
930 fn get_blobs_v1(
931 &self,
932 versioned_hashes: Vec<B256>,
933 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
934 let current_timestamp =
936 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
937 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
938 return Err(EngineApiError::EngineObjectValidationError(
939 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
940 ));
941 }
942
943 if versioned_hashes.len() > MAX_BLOB_LIMIT {
944 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
945 }
946
947 self.inner
948 .tx_pool
949 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
950 .map_err(|err| EngineApiError::Internal(Box::new(err)))
951 }
952
953 pub fn get_blobs_v1_metered(
955 &self,
956 versioned_hashes: Vec<B256>,
957 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
958 let hashes_len = versioned_hashes.len();
959 let start = Instant::now();
960 let res = Self::get_blobs_v1(self, versioned_hashes);
961 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
962
963 if let Ok(blobs) = &res {
964 let blobs_found = blobs.iter().flatten().count();
965 let blobs_missed = hashes_len - blobs_found;
966
967 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
968 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
969 }
970
971 res
972 }
973
974 fn get_blobs_v2(
975 &self,
976 versioned_hashes: Vec<B256>,
977 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
978 let current_timestamp =
980 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
981 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
982 return Err(EngineApiError::EngineObjectValidationError(
983 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
984 ));
985 }
986
987 if versioned_hashes.len() > MAX_BLOB_LIMIT {
988 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
989 }
990
991 self.inner
992 .tx_pool
993 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
994 .map_err(|err| EngineApiError::Internal(Box::new(err)))
995 }
996
997 fn get_blobs_v3(
998 &self,
999 versioned_hashes: Vec<B256>,
1000 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1001 let current_timestamp =
1003 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
1004 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
1005 return Err(EngineApiError::EngineObjectValidationError(
1006 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1007 ));
1008 }
1009
1010 if versioned_hashes.len() > MAX_BLOB_LIMIT {
1011 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1012 }
1013
1014 if (*self.inner.is_syncing)() {
1016 return Ok(None)
1017 }
1018
1019 self.inner
1020 .tx_pool
1021 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
1022 .map(Some)
1023 .map_err(|err| EngineApiError::Internal(Box::new(err)))
1024 }
1025
1026 fn get_blobs_v4(
1027 &self,
1028 versioned_hashes: Vec<B256>,
1029 indices_bitarray: B128,
1030 ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1031 let current_timestamp =
1032 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
1033 if !self.inner.chain_spec.is_amsterdam_active_at_timestamp(current_timestamp) {
1034 return Err(EngineApiError::EngineObjectValidationError(
1035 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1036 ));
1037 }
1038
1039 if versioned_hashes.len() > MAX_BLOB_LIMIT {
1040 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1041 }
1042
1043 if (*self.inner.is_syncing)() {
1045 return Ok(None)
1046 }
1047
1048 self.inner
1049 .tx_pool
1050 .get_blobs_for_versioned_hashes_v4(&versioned_hashes, indices_bitarray)
1051 .map(Some)
1052 .map_err(|err| EngineApiError::Internal(Box::new(err)))
1053 }
1054
1055 pub fn get_blobs_v2_metered(
1057 &self,
1058 versioned_hashes: Vec<B256>,
1059 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
1060 let hashes_len = versioned_hashes.len();
1061 let start = Instant::now();
1062 let res = Self::get_blobs_v2(self, versioned_hashes);
1063 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
1064
1065 if let Ok(blobs) = &res {
1066 let blobs_found = blobs.iter().flatten().count();
1067
1068 self.inner
1069 .metrics
1070 .blob_metrics
1071 .get_blobs_requests_blobs_total
1072 .increment(hashes_len as u64);
1073 self.inner
1074 .metrics
1075 .blob_metrics
1076 .get_blobs_requests_blobs_in_blobpool_total
1077 .increment(blobs_found as u64);
1078
1079 if blobs_found == hashes_len {
1080 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
1081 } else {
1082 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1083 }
1084 } else {
1085 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1086 }
1087
1088 res
1089 }
1090
1091 pub fn get_blobs_v3_metered(
1093 &self,
1094 versioned_hashes: Vec<B256>,
1095 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1096 let hashes_len = versioned_hashes.len();
1097 let start = Instant::now();
1098 let res = Self::get_blobs_v3(self, versioned_hashes);
1099 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
1100
1101 if let Ok(Some(blobs)) = &res {
1102 let blobs_found = blobs.iter().flatten().count();
1103 let blobs_missed = hashes_len - blobs_found;
1104
1105 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1106 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1107 }
1108
1109 res
1110 }
1111
1112 pub fn get_blobs_v4_metered(
1114 &self,
1115 versioned_hashes: Vec<B256>,
1116 indices_bitarray: B128,
1117 ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1118 let hashes_len = versioned_hashes.len();
1119 let start = Instant::now();
1120 let res = Self::get_blobs_v4(self, versioned_hashes, indices_bitarray);
1121 self.inner.metrics.latency.get_blobs_v4.record(start.elapsed());
1122
1123 if let Ok(Some(blobs)) = &res {
1124 let blobs_found = blobs.iter().flatten().count();
1125 let blobs_missed = hashes_len - blobs_found;
1126
1127 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1128 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1129 }
1130
1131 res
1132 }
1133}
1134
1135#[async_trait]
1137impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
1138 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1139where
1140 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
1141 EngineT: EngineTypes<ExecutionData = ExecutionData>,
1142 Pool: TransactionPool + 'static,
1143 Validator: EngineApiValidator<EngineT>,
1144 ChainSpec: EthereumHardforks + Send + Sync + 'static,
1145{
1146 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
1150 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
1151 let payload =
1152 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
1153 Ok(self.new_payload_v1_metered(payload).await?)
1154 }
1155
1156 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
1159 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
1160 let payload = ExecutionData {
1161 payload: payload.into_payload(),
1162 sidecar: ExecutionPayloadSidecar::none(),
1163 };
1164
1165 Ok(self.new_payload_v2_metered(payload).await?)
1166 }
1167
1168 async fn new_payload_v3(
1171 &self,
1172 payload: ExecutionPayloadV3,
1173 versioned_hashes: Vec<B256>,
1174 parent_beacon_block_root: B256,
1175 ) -> RpcResult<PayloadStatus> {
1176 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
1177 let payload = ExecutionData {
1178 payload: payload.into(),
1179 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
1180 versioned_hashes,
1181 parent_beacon_block_root,
1182 }),
1183 };
1184
1185 Ok(self.new_payload_v3_metered(payload).await?)
1186 }
1187
1188 async fn new_payload_v4(
1191 &self,
1192 payload: ExecutionPayloadV3,
1193 versioned_hashes: Vec<B256>,
1194 parent_beacon_block_root: B256,
1195 requests: RequestsOrHash,
1196 ) -> RpcResult<PayloadStatus> {
1197 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1198
1199 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1201 return Err(EngineApiError::UnexpectedRequestsHash.into());
1202 }
1203
1204 let payload = ExecutionData {
1205 payload: payload.into(),
1206 sidecar: ExecutionPayloadSidecar::v4(
1207 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1208 PraguePayloadFields { requests },
1209 ),
1210 };
1211
1212 Ok(self.new_payload_v4_metered(payload).await?)
1213 }
1214
1215 async fn new_payload_v5(
1221 &self,
1222 payload: ExecutionPayloadV4,
1223 versioned_hashes: Vec<B256>,
1224 parent_beacon_block_root: B256,
1225 requests: RequestsOrHash,
1226 ) -> RpcResult<PayloadStatus> {
1227 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1228 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1230 return Err(EngineApiError::UnexpectedRequestsHash.into());
1231 }
1232
1233 let payload = ExecutionData {
1234 payload: payload.into(),
1235 sidecar: ExecutionPayloadSidecar::v4(
1236 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1237 PraguePayloadFields { requests },
1238 ),
1239 };
1240
1241 Ok(self.new_payload_v5_metered(payload).await?)
1242 }
1243
1244 async fn fork_choice_updated_v1(
1249 &self,
1250 fork_choice_state: ForkchoiceState,
1251 payload_attributes: Option<EngineT::PayloadAttributes>,
1252 ) -> RpcResult<ForkchoiceUpdated> {
1253 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1254 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1255 }
1256
1257 async fn fork_choice_updated_v2(
1260 &self,
1261 fork_choice_state: ForkchoiceState,
1262 payload_attributes: Option<EngineT::PayloadAttributes>,
1263 ) -> RpcResult<ForkchoiceUpdated> {
1264 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1265 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1266 }
1267
1268 async fn fork_choice_updated_v3(
1272 &self,
1273 fork_choice_state: ForkchoiceState,
1274 payload_attributes: Option<EngineT::PayloadAttributes>,
1275 ) -> RpcResult<ForkchoiceUpdated> {
1276 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1277 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1278 }
1279
1280 async fn fork_choice_updated_v4(
1284 &self,
1285 fork_choice_state: ForkchoiceState,
1286 payload_attributes: Option<EngineT::PayloadAttributes>,
1287 custody_columns: Option<B128>,
1288 ) -> RpcResult<ForkchoiceUpdated> {
1289 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV4");
1290 Ok(self
1291 .fork_choice_updated_v4_metered(fork_choice_state, payload_attributes, custody_columns)
1292 .await?)
1293 }
1294
1295 async fn get_payload_v1(
1307 &self,
1308 payload_id: PayloadId,
1309 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1310 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1311 Ok(self.get_payload_v1_metered(payload_id).await?)
1312 }
1313
1314 async fn get_payload_v2(
1324 &self,
1325 payload_id: PayloadId,
1326 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1327 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1328 Ok(self.get_payload_v2_metered(payload_id).await?)
1329 }
1330
1331 async fn get_payload_v3(
1341 &self,
1342 payload_id: PayloadId,
1343 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1344 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1345 Ok(self.get_payload_v3_metered(payload_id).await?)
1346 }
1347
1348 async fn get_payload_v4(
1358 &self,
1359 payload_id: PayloadId,
1360 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1361 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1362 Ok(self.get_payload_v4_metered(payload_id).await?)
1363 }
1364
1365 async fn get_payload_v5(
1375 &self,
1376 payload_id: PayloadId,
1377 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1378 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1379 Ok(self.get_payload_v5_metered(payload_id).await?)
1380 }
1381
1382 async fn get_payload_v6(
1388 &self,
1389 payload_id: PayloadId,
1390 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1391 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1392 Ok(self.get_payload_v6_metered(payload_id).await?)
1393 }
1394
1395 async fn get_payload_bodies_by_hash_v1(
1398 &self,
1399 block_hashes: Vec<BlockHash>,
1400 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1401 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1402 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1403 }
1404
1405 async fn get_payload_bodies_by_hash_v2(
1411 &self,
1412 block_hashes: Vec<BlockHash>,
1413 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1414 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1415 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1416 }
1417
1418 async fn get_payload_bodies_by_range_v1(
1435 &self,
1436 start: U64,
1437 count: U64,
1438 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1439 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1440 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1441 }
1442
1443 async fn get_payload_bodies_by_range_v2(
1449 &self,
1450 start: U64,
1451 count: U64,
1452 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1453 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1454 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1455 }
1456
1457 async fn get_client_version_v1(
1461 &self,
1462 client: ClientVersionV1,
1463 ) -> RpcResult<Vec<ClientVersionV1>> {
1464 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1465 Ok(Self::get_client_version_v1(self, client)?)
1466 }
1467
1468 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1471 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1472
1473 let el_caps = self.capabilities();
1474 el_caps.log_capability_mismatches(&capabilities);
1475
1476 Ok(el_caps.list())
1477 }
1478
1479 async fn get_blobs_v1(
1480 &self,
1481 versioned_hashes: Vec<B256>,
1482 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1483 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1484 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1485 }
1486
1487 async fn get_blobs_v2(
1488 &self,
1489 versioned_hashes: Vec<B256>,
1490 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1491 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1492 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1493 }
1494
1495 async fn get_blobs_v3(
1496 &self,
1497 versioned_hashes: Vec<B256>,
1498 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1499 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1500 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1501 }
1502
1503 async fn get_blobs_v4(
1504 &self,
1505 versioned_hashes: Vec<B256>,
1506 indices_bitarray: B128,
1507 ) -> RpcResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1508 trace!(target: "rpc::engine", "Serving engine_getBlobsV4");
1509 Ok(self.get_blobs_v4_metered(versioned_hashes, indices_bitarray)?)
1510 }
1511}
1512
1513impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1514 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1515where
1516 EngineT: EngineTypes,
1517 Self: EngineApiServer<EngineT>,
1518{
1519 fn into_rpc_module(self) -> RpcModule<()> {
1520 EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1521 }
1522}
1523
1524impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1525 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1526where
1527 PayloadT: PayloadTypes,
1528{
1529 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1530 f.debug_struct("EngineApi").finish_non_exhaustive()
1531 }
1532}
1533
1534impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1535 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1536where
1537 PayloadT: PayloadTypes,
1538{
1539 fn clone(&self) -> Self {
1540 Self { inner: Arc::clone(&self.inner) }
1541 }
1542}
1543
1544struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1546 provider: Provider,
1548 chain_spec: Arc<ChainSpec>,
1550 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1552 payload_store: PayloadStore<PayloadT>,
1554 task_spawner: Runtime,
1556 metrics: EngineApiMetrics,
1558 client: ClientVersionV1,
1560 capabilities: EngineCapabilities,
1562 tx_pool: Pool,
1564 validator: Validator,
1566 accept_execution_requests_hash: bool,
1567 cell_custody: CellCustody,
1569 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1571}
1572
1573#[cfg(test)]
1574mod tests {
1575 use super::*;
1576 use alloy_eips::{eip7685::Requests, NumHash};
1577 use alloy_primitives::{keccak256, Address, Bytes, Sealed, B256};
1578 use alloy_rpc_types_engine::{
1579 ClientCode, ClientVersionV1, ExecutionPayloadV2, PayloadAttributes, PayloadStatusEnum,
1580 };
1581 use assert_matches::assert_matches;
1582 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1583 use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1584 use reth_ethereum_engine_primitives::EthEngineTypes;
1585 use reth_ethereum_primitives::Block;
1586 use reth_network_api::{
1587 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1588 };
1589 use reth_node_ethereum::EthereumEngineValidator;
1590 use reth_payload_builder::test_utils::spawn_test_payload_service;
1591 use reth_provider::{test_utils::MockEthProvider, BalStoreHandle, InMemoryBalStore};
1592 use reth_tasks::Runtime;
1593 use reth_transaction_pool::noop::NoopTransactionPool;
1594 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1595
1596 fn setup_engine_api() -> (
1597 EngineApiTestHandle,
1598 EngineApi<
1599 Arc<MockEthProvider>,
1600 EthEngineTypes,
1601 NoopTransactionPool,
1602 EthereumEngineValidator,
1603 ChainSpec,
1604 >,
1605 ) {
1606 let client = ClientVersionV1 {
1607 code: ClientCode::RH,
1608 name: "Reth".to_string(),
1609 version: "v0.2.0-beta.5".to_string(),
1610 commit: "defa64b2".to_string(),
1611 };
1612
1613 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1614 let provider = Arc::new(MockEthProvider::default());
1615 let payload_store = spawn_test_payload_service();
1616 let (to_engine, engine_rx) = unbounded_channel();
1617 let task_executor = Runtime::test();
1618 let api = EngineApi::new(
1619 provider.clone(),
1620 chain_spec.clone(),
1621 ConsensusEngineHandle::new(to_engine),
1622 payload_store.into(),
1623 NoopTransactionPool::default(),
1624 task_executor,
1625 client,
1626 EngineCapabilities::default(),
1627 EthereumEngineValidator::new(chain_spec.clone()),
1628 false,
1629 NoopNetwork::default(),
1630 );
1631 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1632 (handle, api)
1633 }
1634
1635 #[tokio::test]
1636 async fn engine_client_version_v1() {
1637 let client = ClientVersionV1 {
1638 code: ClientCode::RH,
1639 name: "Reth".to_string(),
1640 version: "v0.2.0-beta.5".to_string(),
1641 commit: "defa64b2".to_string(),
1642 };
1643 let (_, api) = setup_engine_api();
1644 let res = api.get_client_version_v1(client.clone());
1645 assert_eq!(res.unwrap(), vec![client]);
1646 }
1647
1648 #[tokio::test]
1649 async fn get_payload_bodies_by_hash_v2_returns_block_access_list_from_store() {
1650 let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1651 let mut provider = MockEthProvider::default();
1652 provider.bal_store = bal_store.clone();
1653 let provider = Arc::new(provider);
1654
1655 let client = ClientVersionV1 {
1656 code: ClientCode::RH,
1657 name: "Reth".to_string(),
1658 version: "v0.2.0-beta.5".to_string(),
1659 commit: "defa64b2".to_string(),
1660 };
1661 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1662 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1663 let (to_engine, _engine_rx) = unbounded_channel();
1664 let api = EngineApi::new(
1665 provider.clone(),
1666 chain_spec.clone(),
1667 ConsensusEngineHandle::new(to_engine),
1668 payload_store.into(),
1669 NoopTransactionPool::default(),
1670 Runtime::test(),
1671 client,
1672 EngineCapabilities::default(),
1673 EthereumEngineValidator::new(chain_spec),
1674 false,
1675 NoopNetwork::default(),
1676 );
1677
1678 let mut block = Block::default();
1679 block.header.number = 1;
1680 let block_hash = block.header.hash_slow();
1681 provider.add_block(block_hash, block);
1682
1683 let mut block_without_bal = Block::default();
1684 block_without_bal.header.number = 2;
1685 let block_without_bal_hash = block_without_bal.header.hash_slow();
1686 provider.add_block(block_without_bal_hash, block_without_bal);
1687
1688 let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1689 let sealed_bal = Sealed::new_unchecked(raw_bal.clone(), keccak256(&raw_bal));
1690 bal_store.insert(NumHash::new(1, block_hash), sealed_bal).unwrap();
1691
1692 let missing_hash = B256::with_last_byte(3);
1693 let response = api
1694 .get_payload_bodies_by_hash_v2(vec![block_hash, block_without_bal_hash, missing_hash])
1695 .await
1696 .unwrap();
1697
1698 assert_eq!(response.len(), 3);
1699 assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1700 assert_eq!(response[1].as_ref().unwrap().block_access_list, None);
1701 assert!(response[2].is_none());
1702 }
1703
1704 #[tokio::test]
1705 async fn get_payload_bodies_by_range_v2_returns_block_access_lists_from_store() {
1706 let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1707 let mut provider = MockEthProvider::default();
1708 provider.bal_store = bal_store.clone();
1709 let provider = Arc::new(provider);
1710
1711 let client = ClientVersionV1 {
1712 code: ClientCode::RH,
1713 name: "Reth".to_string(),
1714 version: "v0.2.0-beta.5".to_string(),
1715 commit: "defa64b2".to_string(),
1716 };
1717 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1718 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1719 let (to_engine, _engine_rx) = unbounded_channel();
1720 let api = EngineApi::new(
1721 provider.clone(),
1722 chain_spec.clone(),
1723 ConsensusEngineHandle::new(to_engine),
1724 payload_store.into(),
1725 NoopTransactionPool::default(),
1726 Runtime::test(),
1727 client,
1728 EngineCapabilities::default(),
1729 EthereumEngineValidator::new(chain_spec),
1730 false,
1731 NoopNetwork::default(),
1732 );
1733
1734 let mut block = Block::default();
1735 block.header.number = 1;
1736 let block_hash = block.header.hash_slow();
1737 provider.add_block(block_hash, block);
1738
1739 let mut block_without_bal = Block::default();
1740 block_without_bal.header.number = 3;
1741 let block_without_bal_hash = block_without_bal.header.hash_slow();
1742 provider.add_block(block_without_bal_hash, block_without_bal);
1743
1744 let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1745 let sealed_bal = Sealed::new_unchecked(raw_bal.clone(), keccak256(&raw_bal));
1746 bal_store.insert(NumHash::new(1, block_hash), sealed_bal).unwrap();
1747
1748 let response = api.get_payload_bodies_by_range_v2(1, 3).await.unwrap();
1749
1750 assert_eq!(response.len(), 3);
1751 assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1752 assert!(response[1].is_none());
1753 assert_eq!(response[2].as_ref().unwrap().block_access_list, None);
1754 }
1755
1756 struct EngineApiTestHandle {
1757 #[allow(dead_code)]
1758 chain_spec: Arc<ChainSpec>,
1759 provider: Arc<MockEthProvider>,
1760 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1761 }
1762
1763 #[tokio::test]
1764 async fn forwards_responses_to_consensus_engine() {
1765 let (mut handle, api) = setup_engine_api();
1766
1767 tokio::spawn(async move {
1768 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1769 let execution_data = ExecutionData {
1770 payload: payload_v1.into(),
1771 sidecar: ExecutionPayloadSidecar::none(),
1772 };
1773
1774 api.new_payload_v1(execution_data).await.unwrap();
1775 });
1776 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1777 }
1778
1779 #[tokio::test]
1780 async fn new_payload_v5_accepts_amsterdam_payloads() {
1781 let chain_spec = Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1782 let provider = Arc::new(MockEthProvider::default());
1783 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1784 let (to_engine, mut engine_rx) = unbounded_channel();
1785
1786 let api = EngineApi::new(
1787 provider,
1788 chain_spec.clone(),
1789 ConsensusEngineHandle::new(to_engine),
1790 payload_store.into(),
1791 NoopTransactionPool::default(),
1792 Runtime::test(),
1793 ClientVersionV1 {
1794 code: ClientCode::RH,
1795 name: "Reth".to_string(),
1796 version: "v0.0.0-test".to_string(),
1797 commit: "test".to_string(),
1798 },
1799 EngineCapabilities::default(),
1800 EthereumEngineValidator::new(chain_spec),
1801 false,
1802 NoopNetwork::default(),
1803 );
1804
1805 tokio::spawn(async move {
1806 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1807 let payload = ExecutionPayloadV4 {
1808 payload_inner: ExecutionPayloadV3 {
1809 payload_inner: ExecutionPayloadV2 {
1810 payload_inner: payload_v1,
1811 withdrawals: Vec::new(),
1812 },
1813 blob_gas_used: 0,
1814 excess_blob_gas: 0,
1815 },
1816 block_access_list: Bytes::from_static(b"bal"),
1817 slot_number: 1,
1818 };
1819 let execution_data = ExecutionData {
1820 payload: payload.into(),
1821 sidecar: ExecutionPayloadSidecar::v4(
1822 CancunPayloadFields {
1823 versioned_hashes: Vec::new(),
1824 parent_beacon_block_root: B256::ZERO,
1825 },
1826 PraguePayloadFields { requests: RequestsOrHash::Requests(Requests::default()) },
1827 ),
1828 };
1829
1830 api.new_payload_v5(execution_data).await.unwrap();
1831 });
1832
1833 assert_matches!(engine_rx.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1834 }
1835
1836 #[derive(Clone)]
1837 struct TestNetworkInfo {
1838 syncing: bool,
1839 }
1840
1841 impl NetworkInfo for TestNetworkInfo {
1842 fn local_addr(&self) -> std::net::SocketAddr {
1843 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1844 }
1845
1846 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1847 #[allow(deprecated)]
1848 Ok(NetworkStatus {
1849 client_version: "test".to_string(),
1850 protocol_version: 5,
1851 eth_protocol_info: EthProtocolInfo {
1852 network: 1,
1853 difficulty: None,
1854 genesis: Default::default(),
1855 config: Default::default(),
1856 head: Default::default(),
1857 },
1858 capabilities: vec![],
1859 })
1860 }
1861
1862 fn chain_id(&self) -> u64 {
1863 1
1864 }
1865
1866 fn cell_custody(&self) -> &CellCustody {
1867 static CELL_CUSTODY: std::sync::OnceLock<CellCustody> = std::sync::OnceLock::new();
1868 CELL_CUSTODY.get_or_init(CellCustody::default)
1869 }
1870
1871 fn is_syncing(&self) -> bool {
1872 self.syncing
1873 }
1874
1875 fn is_initially_syncing(&self) -> bool {
1876 self.syncing
1877 }
1878 }
1879
1880 #[tokio::test]
1881 async fn get_blobs_v3_returns_null_when_syncing() {
1882 let chain_spec: Arc<ChainSpec> =
1883 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1884 let provider = Arc::new(MockEthProvider::default());
1885 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1886 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1887
1888 let api = EngineApi::new(
1889 provider,
1890 chain_spec.clone(),
1891 ConsensusEngineHandle::new(to_engine),
1892 payload_store.into(),
1893 NoopTransactionPool::default(),
1894 Runtime::test(),
1895 ClientVersionV1 {
1896 code: ClientCode::RH,
1897 name: "Reth".to_string(),
1898 version: "v0.0.0-test".to_string(),
1899 commit: "test".to_string(),
1900 },
1901 EngineCapabilities::default(),
1902 EthereumEngineValidator::new(chain_spec),
1903 false,
1904 TestNetworkInfo { syncing: true },
1905 );
1906
1907 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1908 assert_matches!(res, Ok(None));
1909 }
1910
1911 #[tokio::test]
1912 async fn get_blobs_v4_returns_null_when_syncing() {
1913 let chain_spec: Arc<ChainSpec> =
1914 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1915 let provider = Arc::new(MockEthProvider::default());
1916 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1917 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1918
1919 let api = EngineApi::new(
1920 provider,
1921 chain_spec.clone(),
1922 ConsensusEngineHandle::new(to_engine),
1923 payload_store.into(),
1924 NoopTransactionPool::default(),
1925 Runtime::test(),
1926 ClientVersionV1 {
1927 code: ClientCode::RH,
1928 name: "Reth".to_string(),
1929 version: "v0.0.0-test".to_string(),
1930 commit: "test".to_string(),
1931 },
1932 EngineCapabilities::default(),
1933 EthereumEngineValidator::new(chain_spec),
1934 false,
1935 TestNetworkInfo { syncing: true },
1936 );
1937
1938 let res = api.get_blobs_v4_metered(vec![B256::ZERO], B128::from(1u128));
1939 assert_matches!(res, Ok(None));
1940 }
1941
1942 #[tokio::test]
1943 async fn fcu_v4_updates_shared_cell_custody_before_forkchoice_result() {
1944 let chain_spec: Arc<ChainSpec> =
1945 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1946 let provider = Arc::new(MockEthProvider::default());
1947 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1948 let (to_engine, mut engine_rx) = unbounded_channel();
1949 let network = NoopNetwork::default();
1950 let cell_custody = network.cell_custody().clone();
1951
1952 let api = EngineApi::new(
1953 provider,
1954 chain_spec.clone(),
1955 ConsensusEngineHandle::new(to_engine),
1956 payload_store.into(),
1957 NoopTransactionPool::default(),
1958 Runtime::test(),
1959 ClientVersionV1 {
1960 code: ClientCode::RH,
1961 name: "Reth".to_string(),
1962 version: "v0.0.0-test".to_string(),
1963 commit: "test".to_string(),
1964 },
1965 EngineCapabilities::default(),
1966 EthereumEngineValidator::new(chain_spec),
1967 false,
1968 network,
1969 );
1970
1971 let state = ForkchoiceState {
1972 head_block_hash: B256::from([0x33; 32]),
1973 safe_block_hash: B256::ZERO,
1974 finalized_block_hash: B256::ZERO,
1975 };
1976 let custody_columns = B128::from(0b1010u128);
1977
1978 let api_task = tokio::spawn(async move {
1979 api.fork_choice_updated_v4(state, None, Some(custody_columns)).await
1980 });
1981
1982 let request = tokio::time::timeout(std::time::Duration::from_secs(1), engine_rx.recv())
1983 .await
1984 .expect("timed out waiting for forkchoiceUpdated request")
1985 .expect("expected forkchoiceUpdated request");
1986 let response_tx = match request {
1987 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1988 assert!(payload_attrs.is_none());
1989 tx
1990 }
1991 other => panic!("unexpected engine message: {other:?}"),
1992 };
1993 assert_eq!(cell_custody.get(), custody_columns);
1994
1995 response_tx
1996 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1997 PayloadStatusEnum::Valid,
1998 ))))
1999 .expect("send valid response");
2000
2001 api_task
2002 .await
2003 .expect("api task should not panic")
2004 .expect("forkchoiceUpdatedV4 should succeed");
2005 assert_eq!(cell_custody.get(), custody_columns);
2006 }
2007
2008 #[tokio::test]
2009 async fn fcu_v4_updates_shared_cell_custody_when_payload_attrs_invalid() {
2010 let chain_spec: Arc<ChainSpec> =
2011 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
2012 let provider = Arc::new(MockEthProvider::default());
2013 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
2014 let (to_engine, mut engine_rx) = unbounded_channel();
2015 let network = NoopNetwork::default();
2016 let cell_custody = network.cell_custody().clone();
2017
2018 let api = EngineApi::new(
2019 provider,
2020 chain_spec.clone(),
2021 ConsensusEngineHandle::new(to_engine),
2022 payload_store.into(),
2023 NoopTransactionPool::default(),
2024 Runtime::test(),
2025 ClientVersionV1 {
2026 code: ClientCode::RH,
2027 name: "Reth".to_string(),
2028 version: "v0.0.0-test".to_string(),
2029 commit: "test".to_string(),
2030 },
2031 EngineCapabilities::default(),
2032 EthereumEngineValidator::new(chain_spec),
2033 false,
2034 network,
2035 );
2036
2037 let state = ForkchoiceState {
2038 head_block_hash: B256::from([0x44; 32]),
2039 safe_block_hash: B256::ZERO,
2040 finalized_block_hash: B256::ZERO,
2041 };
2042 let payload_attributes = PayloadAttributes {
2043 timestamp: 1,
2044 prev_randao: B256::ZERO,
2045 suggested_fee_recipient: Address::ZERO,
2046 withdrawals: Some(vec![]),
2047 parent_beacon_block_root: None,
2048 slot_number: None,
2049 };
2050 let custody_columns = B128::from(0b1010u128);
2051
2052 let api_task = tokio::spawn(async move {
2053 api.fork_choice_updated_v4(state, Some(payload_attributes), Some(custody_columns)).await
2054 });
2055
2056 let request = tokio::time::timeout(std::time::Duration::from_secs(1), engine_rx.recv())
2057 .await
2058 .expect("timed out waiting for forkchoiceUpdated request")
2059 .expect("expected forkchoiceUpdated request");
2060 let response_tx = match request {
2061 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2062 assert!(
2063 payload_attrs.is_none(),
2064 "when attrs are invalid, API should first evaluate forkchoice without attrs"
2065 );
2066 tx
2067 }
2068 other => panic!("unexpected engine message: {other:?}"),
2069 };
2070 assert_eq!(cell_custody.get(), custody_columns);
2071
2072 response_tx
2073 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2074 PayloadStatusEnum::Valid,
2075 ))))
2076 .expect("send valid response");
2077
2078 let response = api_task.await.expect("api task should not panic");
2079 assert_matches!(
2080 response,
2081 Err(EngineApiError::EngineObjectValidationError(
2082 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
2083 ))
2084 );
2085 }
2086
2087 #[tokio::test]
2088 async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
2089 let (mut handle, api) = setup_engine_api();
2090
2091 let state = ForkchoiceState {
2092 head_block_hash: B256::from([0x11; 32]),
2093 safe_block_hash: B256::ZERO,
2094 finalized_block_hash: B256::ZERO,
2095 };
2096 let payload_attributes = PayloadAttributes {
2097 timestamp: 1,
2098 prev_randao: B256::ZERO,
2099 suggested_fee_recipient: Address::ZERO,
2100 withdrawals: Some(vec![]),
2101 parent_beacon_block_root: None,
2103 slot_number: None,
2104 };
2105
2106 let api_task = tokio::spawn(async move {
2107 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
2108 });
2109
2110 let request =
2111 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
2112 .await
2113 .expect("timed out waiting for forkchoiceUpdated request")
2114 .expect("expected forkchoiceUpdated request");
2115 let response_tx = match request {
2116 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2117 assert!(
2118 payload_attrs.is_none(),
2119 "FCU for syncing state should be evaluated before payload attributes"
2120 );
2121 tx
2122 }
2123 other => panic!("unexpected engine message: {other:?}"),
2124 };
2125
2126 response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
2127
2128 let response = api_task
2129 .await
2130 .expect("api task should not panic")
2131 .expect("forkchoiceUpdatedV3 should return a syncing response");
2132 assert!(response.payload_status.is_syncing());
2133 assert!(response.payload_id.is_none());
2134 }
2135
2136 #[tokio::test]
2137 async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
2138 let (mut handle, api) = setup_engine_api();
2139
2140 let state = ForkchoiceState {
2141 head_block_hash: B256::from([0x22; 32]),
2142 safe_block_hash: B256::ZERO,
2143 finalized_block_hash: B256::ZERO,
2144 };
2145 let payload_attributes = PayloadAttributes {
2146 timestamp: 1,
2147 prev_randao: B256::ZERO,
2148 suggested_fee_recipient: Address::ZERO,
2149 withdrawals: Some(vec![]),
2150 parent_beacon_block_root: None,
2151 slot_number: None,
2152 };
2153
2154 let api_task = tokio::spawn(async move {
2155 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
2156 });
2157
2158 let request =
2159 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
2160 .await
2161 .expect("timed out waiting for forkchoiceUpdated request")
2162 .expect("expected forkchoiceUpdated request");
2163
2164 let response_tx = match request {
2165 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
2166 assert!(
2167 payload_attrs.is_none(),
2168 "when attrs are invalid, API should first evaluate forkchoice without attrs"
2169 );
2170 tx
2171 }
2172 other => panic!("unexpected engine message: {other:?}"),
2173 };
2174
2175 response_tx
2176 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
2177 PayloadStatusEnum::Valid,
2178 ))))
2179 .expect("send valid response");
2180
2181 let response = api_task.await.expect("api task should not panic");
2182 assert_matches!(
2183 response,
2184 Err(EngineApiError::EngineObjectValidationError(
2185 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
2186 ))
2187 );
2188
2189 match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
2190 .await
2191 {
2192 Err(_) | Ok(None) => {}
2193 Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
2194 panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
2195 }
2196 Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
2197 }
2198 }
2199
2200 mod get_payload_bodies {
2202 use super::*;
2203 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
2204 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
2205
2206 #[tokio::test]
2207 async fn invalid_params() {
2208 let (_, api) = setup_engine_api();
2209
2210 let by_range_tests = [
2211 (0, 0),
2213 (0, 1),
2214 (1, 0),
2215 ];
2216
2217 for (start, count) in by_range_tests {
2219 let res = api.get_payload_bodies_by_range_v1(start, count).await;
2220 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
2221 }
2222 }
2223
2224 #[tokio::test]
2225 async fn request_too_large() {
2226 let (_, api) = setup_engine_api();
2227
2228 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
2229 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
2230 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
2231 }
2232
2233 #[tokio::test]
2234 async fn returns_payload_bodies() {
2235 let mut rng = generators::rng();
2236 let (handle, api) = setup_engine_api();
2237
2238 let (start, count) = (1, 10);
2239 let blocks = random_block_range(
2240 &mut rng,
2241 start..=start + count - 1,
2242 BlockRangeParams { tx_count: 0..2, ..Default::default() },
2243 );
2244 handle
2245 .provider
2246 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
2247
2248 let expected = blocks
2249 .iter()
2250 .cloned()
2251 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
2252 .collect::<Vec<_>>();
2253
2254 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2255 assert_eq!(res, expected);
2256 }
2257
2258 #[tokio::test]
2259 async fn returns_payload_bodies_with_gaps() {
2260 let mut rng = generators::rng();
2261 let (handle, api) = setup_engine_api();
2262
2263 let (start, count) = (1, 100);
2264 let blocks = random_block_range(
2265 &mut rng,
2266 start..=start + count - 1,
2267 BlockRangeParams { tx_count: 0..2, ..Default::default() },
2268 );
2269
2270 let first_missing_range = 26..=50;
2272 let second_missing_range = 76..=100;
2273 handle.provider.extend_blocks(
2274 blocks
2275 .iter()
2276 .filter(|b| {
2277 !first_missing_range.contains(&b.number) &&
2278 !second_missing_range.contains(&b.number)
2279 })
2280 .map(|b| (b.hash(), b.clone().into_block())),
2281 );
2282
2283 let expected = blocks
2284 .iter()
2285 .filter(|b| !second_missing_range.contains(&b.number))
2288 .cloned()
2289 .map(|b| {
2290 if first_missing_range.contains(&b.number) {
2291 None
2292 } else {
2293 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2294 }
2295 })
2296 .collect::<Vec<_>>();
2297
2298 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2299 assert_eq!(res, expected);
2300
2301 let expected = blocks
2302 .iter()
2303 .cloned()
2304 .map(|b| {
2307 if first_missing_range.contains(&b.number) ||
2308 second_missing_range.contains(&b.number)
2309 {
2310 None
2311 } else {
2312 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2313 }
2314 })
2315 .collect::<Vec<_>>();
2316
2317 let hashes = blocks.iter().map(|b| b.hash()).collect();
2318 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
2319 assert_eq!(res, expected);
2320 }
2321 }
2322}