1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, Bytes, B128, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2,
14 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
15 ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
16 PraguePayloadFields,
17};
18use async_trait::async_trait;
19use jsonrpsee_core::{server::RpcModule, RpcResult};
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes};
22use reth_network_api::NetworkInfo;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, MessageValidationKind,
26 PayloadOrAttributes, PayloadTypes,
27};
28use reth_primitives_traits::{Block, BlockBody};
29use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
30use reth_storage_api::{BalProvider, BlockReader, HeaderProvider, StateProviderFactory};
31use reth_tasks::Runtime;
32use reth_transaction_pool::TransactionPool;
33use std::{
34 sync::Arc,
35 time::{Instant, SystemTime},
36};
37use tokio::sync::oneshot;
38use tracing::{debug, trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
66}
67
68impl<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec>
69 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
70{
71 pub fn chain_spec(&self) -> &Arc<ChainSpec> {
73 &self.inner.chain_spec
74 }
75}
76
77impl<Provider, PayloadT, Pool, Validator, ChainSpec>
78 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
79where
80 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
81 PayloadT: PayloadTypes,
82 Pool: TransactionPool + 'static,
83 Validator: EngineApiValidator<PayloadT>,
84 ChainSpec: EthereumHardforks + Send + Sync + 'static,
85{
86 #[expect(clippy::too_many_arguments)]
88 pub fn new(
89 provider: Provider,
90 chain_spec: Arc<ChainSpec>,
91 beacon_consensus: ConsensusEngineHandle<PayloadT>,
92 payload_store: PayloadStore<PayloadT>,
93 tx_pool: Pool,
94 task_spawner: Runtime,
95 client: ClientVersionV1,
96 capabilities: EngineCapabilities,
97 validator: Validator,
98 accept_execution_requests_hash: bool,
99 network: impl NetworkInfo + 'static,
100 ) -> Self {
101 let is_syncing = Arc::new(move || network.is_syncing());
102 let inner = Arc::new(EngineApiInner {
103 provider,
104 chain_spec,
105 beacon_consensus,
106 payload_store,
107 task_spawner,
108 metrics: EngineApiMetrics::default(),
109 client,
110 capabilities,
111 tx_pool,
112 validator,
113 accept_execution_requests_hash,
114 is_syncing,
115 });
116 Self { inner }
117 }
118
119 pub fn get_client_version_v1(
121 &self,
122 _client: ClientVersionV1,
123 ) -> EngineApiResult<Vec<ClientVersionV1>> {
124 Ok(vec![self.inner.client.clone()])
125 }
126
127 async fn get_payload_timestamp(&self, payload_id: PayloadId) -> EngineApiResult<u64> {
129 Ok(self
130 .inner
131 .payload_store
132 .payload_timestamp(payload_id)
133 .await
134 .ok_or(EngineApiError::UnknownPayload)??)
135 }
136
137 pub async fn new_payload_v1(
140 &self,
141 payload: PayloadT::ExecutionData,
142 ) -> EngineApiResult<PayloadStatus> {
143 let payload_or_attrs = PayloadOrAttributes::<
144 '_,
145 PayloadT::ExecutionData,
146 PayloadT::PayloadAttributes,
147 >::from_execution_payload(&payload);
148
149 self.inner
150 .validator
151 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
154 }
155
156 pub async fn new_payload_v1_metered(
158 &self,
159 payload: PayloadT::ExecutionData,
160 ) -> EngineApiResult<PayloadStatus> {
161 let start = Instant::now();
162 let res = Self::new_payload_v1(self, payload).await;
163 let elapsed = start.elapsed();
164 self.inner.metrics.latency.new_payload_v1.record(elapsed);
165 res
166 }
167
168 pub async fn new_payload_v2(
170 &self,
171 payload: PayloadT::ExecutionData,
172 ) -> EngineApiResult<PayloadStatus> {
173 let payload_or_attrs = PayloadOrAttributes::<
174 '_,
175 PayloadT::ExecutionData,
176 PayloadT::PayloadAttributes,
177 >::from_execution_payload(&payload);
178 self.inner
179 .validator
180 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
181 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
182 }
183
184 pub async fn new_payload_v2_metered(
186 &self,
187 payload: PayloadT::ExecutionData,
188 ) -> EngineApiResult<PayloadStatus> {
189 let start = Instant::now();
190 let res = Self::new_payload_v2(self, payload).await;
191 let elapsed = start.elapsed();
192 self.inner.metrics.latency.new_payload_v2.record(elapsed);
193 res
194 }
195
196 pub async fn new_payload_v3(
198 &self,
199 payload: PayloadT::ExecutionData,
200 ) -> EngineApiResult<PayloadStatus> {
201 let payload_or_attrs = PayloadOrAttributes::<
202 '_,
203 PayloadT::ExecutionData,
204 PayloadT::PayloadAttributes,
205 >::from_execution_payload(&payload);
206 self.inner
207 .validator
208 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
209
210 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
211 }
212
213 pub async fn new_payload_v3_metered(
215 &self,
216 payload: PayloadT::ExecutionData,
217 ) -> RpcResult<PayloadStatus> {
218 let start = Instant::now();
219
220 let res = Self::new_payload_v3(self, payload).await;
221 let elapsed = start.elapsed();
222 self.inner.metrics.latency.new_payload_v3.record(elapsed);
223 Ok(res?)
224 }
225
226 pub async fn new_payload_v4(
228 &self,
229 payload: PayloadT::ExecutionData,
230 ) -> EngineApiResult<PayloadStatus> {
231 let payload_or_attrs = PayloadOrAttributes::<
232 '_,
233 PayloadT::ExecutionData,
234 PayloadT::PayloadAttributes,
235 >::from_execution_payload(&payload);
236 self.inner
237 .validator
238 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
239
240 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
241 }
242
243 pub async fn new_payload_v4_metered(
245 &self,
246 payload: PayloadT::ExecutionData,
247 ) -> RpcResult<PayloadStatus> {
248 let start = Instant::now();
249 let res = Self::new_payload_v4(self, payload).await;
250
251 let elapsed = start.elapsed();
252 self.inner.metrics.latency.new_payload_v4.record(elapsed);
253 Ok(res?)
254 }
255
256 pub async fn new_payload_v5(
262 &self,
263 payload: PayloadT::ExecutionData,
264 ) -> EngineApiResult<PayloadStatus> {
265 let payload_or_attrs = PayloadOrAttributes::<
266 '_,
267 PayloadT::ExecutionData,
268 PayloadT::PayloadAttributes,
269 >::from_execution_payload(&payload);
270 self.inner
271 .validator
272 .validate_version_specific_fields(EngineApiMessageVersion::V5, payload_or_attrs)?;
273 Ok(self.inner.beacon_consensus.new_payload(payload).await?)
274 }
275
276 pub async fn new_payload_v5_metered(
278 &self,
279 payload: PayloadT::ExecutionData,
280 ) -> RpcResult<PayloadStatus> {
281 let start = Instant::now();
282 let res = Self::new_payload_v5(self, payload).await;
283 let elapsed = start.elapsed();
284 self.inner.metrics.latency.new_payload_v5.record(elapsed);
285 Ok(res?)
286 }
287
288 pub fn accept_execution_requests_hash(&self) -> bool {
290 self.inner.accept_execution_requests_hash
291 }
292}
293
294impl<Provider, EngineT, Pool, Validator, ChainSpec>
295 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
296where
297 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
298 EngineT: EngineTypes,
299 Pool: TransactionPool + 'static,
300 Validator: EngineApiValidator<EngineT>,
301 ChainSpec: EthereumHardforks + Send + Sync + 'static,
302{
303 pub async fn fork_choice_updated_v1(
310 &self,
311 state: ForkchoiceState,
312 payload_attrs: Option<EngineT::PayloadAttributes>,
313 ) -> EngineApiResult<ForkchoiceUpdated> {
314 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
315 .await
316 }
317
318 pub async fn fork_choice_updated_v1_metered(
320 &self,
321 state: ForkchoiceState,
322 payload_attrs: Option<EngineT::PayloadAttributes>,
323 ) -> EngineApiResult<ForkchoiceUpdated> {
324 let start = Instant::now();
325 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
326 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
327 res
328 }
329
330 pub async fn fork_choice_updated_v2(
335 &self,
336 state: ForkchoiceState,
337 payload_attrs: Option<EngineT::PayloadAttributes>,
338 ) -> EngineApiResult<ForkchoiceUpdated> {
339 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
340 .await
341 }
342
343 pub async fn fork_choice_updated_v2_metered(
345 &self,
346 state: ForkchoiceState,
347 payload_attrs: Option<EngineT::PayloadAttributes>,
348 ) -> EngineApiResult<ForkchoiceUpdated> {
349 let start = Instant::now();
350 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
351 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
352 res
353 }
354
355 pub async fn fork_choice_updated_v3(
360 &self,
361 state: ForkchoiceState,
362 payload_attrs: Option<EngineT::PayloadAttributes>,
363 ) -> EngineApiResult<ForkchoiceUpdated> {
364 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
365 .await
366 }
367
368 pub async fn fork_choice_updated_v3_metered(
370 &self,
371 state: ForkchoiceState,
372 payload_attrs: Option<EngineT::PayloadAttributes>,
373 ) -> EngineApiResult<ForkchoiceUpdated> {
374 let start = Instant::now();
375 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
376 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
377 res
378 }
379
380 pub async fn fork_choice_updated_v4(
385 &self,
386 state: ForkchoiceState,
387 payload_attrs: Option<EngineT::PayloadAttributes>,
388 ) -> EngineApiResult<ForkchoiceUpdated> {
389 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V4, state, payload_attrs)
390 .await
391 }
392
393 pub async fn fork_choice_updated_v4_metered(
395 &self,
396 state: ForkchoiceState,
397 payload_attrs: Option<EngineT::PayloadAttributes>,
398 ) -> EngineApiResult<ForkchoiceUpdated> {
399 let start = Instant::now();
400 let res = Self::fork_choice_updated_v4(self, state, payload_attrs).await;
401 self.inner.metrics.latency.fork_choice_updated_v4.record(start.elapsed());
402 res
403 }
404
405 async fn get_built_payload(
407 &self,
408 payload_id: PayloadId,
409 ) -> EngineApiResult<EngineT::BuiltPayload> {
410 self.inner
411 .payload_store
412 .resolve(payload_id)
413 .await
414 .ok_or(EngineApiError::UnknownPayload)?
415 .map_err(|_| EngineApiError::UnknownPayload)
416 }
417
418 async fn get_payload_inner<R>(
421 &self,
422 payload_id: PayloadId,
423 version: EngineApiMessageVersion,
424 ) -> EngineApiResult<R>
425 where
426 EngineT::BuiltPayload: TryInto<R>,
427 {
428 let timestamp = self.get_payload_timestamp(payload_id).await?;
431 validate_payload_timestamp(
432 &self.inner.chain_spec,
433 version,
434 timestamp,
435 MessageValidationKind::GetPayload,
436 )?;
437
438 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
440 warn!(?version, "could not transform built payload");
441 EngineApiError::UnknownPayload
442 })
443 }
444
445 pub async fn get_payload_v1(
455 &self,
456 payload_id: PayloadId,
457 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
458 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
459 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
460 EngineApiError::UnknownPayload
461 })
462 }
463
464 pub async fn get_payload_v1_metered(
466 &self,
467 payload_id: PayloadId,
468 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
469 let start = Instant::now();
470 let res = Self::get_payload_v1(self, payload_id).await;
471 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
472 res
473 }
474
475 pub async fn get_payload_v2(
483 &self,
484 payload_id: PayloadId,
485 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
486 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
487 }
488
489 pub async fn get_payload_v2_metered(
491 &self,
492 payload_id: PayloadId,
493 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
494 let start = Instant::now();
495 let res = Self::get_payload_v2(self, payload_id).await;
496 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
497 res
498 }
499
500 pub async fn get_payload_v3(
508 &self,
509 payload_id: PayloadId,
510 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
511 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
512 }
513
514 pub async fn get_payload_v3_metered(
516 &self,
517 payload_id: PayloadId,
518 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
519 let start = Instant::now();
520 let res = Self::get_payload_v3(self, payload_id).await;
521 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
522 res
523 }
524
525 pub async fn get_payload_v4(
533 &self,
534 payload_id: PayloadId,
535 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
536 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
537 }
538
539 pub async fn get_payload_v4_metered(
541 &self,
542 payload_id: PayloadId,
543 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
544 let start = Instant::now();
545 let res = Self::get_payload_v4(self, payload_id).await;
546 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
547 res
548 }
549
550 pub async fn get_payload_v5(
560 &self,
561 payload_id: PayloadId,
562 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
563 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
564 }
565
566 pub async fn get_payload_v5_metered(
568 &self,
569 payload_id: PayloadId,
570 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
571 let start = Instant::now();
572 let res = Self::get_payload_v5(self, payload_id).await;
573 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
574 res
575 }
576
577 pub async fn get_payload_v6(
583 &self,
584 payload_id: PayloadId,
585 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
586 self.get_payload_inner(payload_id, EngineApiMessageVersion::V6).await
587 }
588
589 pub async fn get_payload_v6_metered(
591 &self,
592 payload_id: PayloadId,
593 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV6> {
594 let start = Instant::now();
595 let res = Self::get_payload_v6(self, payload_id).await;
596 self.inner.metrics.latency.get_payload_v6.record(start.elapsed());
597 res
598 }
599
600 pub async fn get_payload_bodies_by_range_with<F, R>(
603 &self,
604 start: BlockNumber,
605 count: u64,
606 f: F,
607 ) -> EngineApiResult<Vec<Option<R>>>
608 where
609 F: Fn(Provider::Block) -> R + Send + 'static,
610 R: Send + 'static,
611 {
612 let (tx, rx) = oneshot::channel();
613 let inner = self.inner.clone();
614
615 self.inner.task_spawner.spawn_blocking_task(async move {
616 if count > MAX_PAYLOAD_BODIES_LIMIT {
617 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
618 return;
619 }
620
621 if start == 0 || count == 0 {
622 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
623 return;
624 }
625
626 let mut result = Vec::with_capacity(count as usize);
627
628 let mut end = start.saturating_add(count - 1);
630
631 if let Ok(best_block) = inner.provider.best_block_number()
634 && end > best_block {
635 end = best_block;
636 }
637
638 let earliest_block = inner.provider.earliest_block_number().unwrap_or(0);
640 for num in start..=end {
641 if num < earliest_block {
642 result.push(None);
643 continue;
644 }
645 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
646 match block_result {
647 Ok(block) => {
648 result.push(block.map(&f));
649 }
650 Err(err) => {
651 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
652 return;
653 }
654 };
655 }
656 tx.send(Ok(result)).ok();
657 });
658
659 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
660 }
661
662 pub async fn get_payload_bodies_by_range_v1(
673 &self,
674 start: BlockNumber,
675 count: u64,
676 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
677 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
678 transactions: block.body().encoded_2718_transactions(),
679 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
680 })
681 .await
682 }
683
684 pub async fn get_payload_bodies_by_range_v1_metered(
686 &self,
687 start: BlockNumber,
688 count: u64,
689 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
690 let start_time = Instant::now();
691 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
692 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
693 res
694 }
695
696 pub async fn get_payload_bodies_by_range_v2(
700 &self,
701 start: BlockNumber,
702 count: u64,
703 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
704 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 {
705 transactions: block.body().encoded_2718_transactions(),
706 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
707 block_access_list: None,
708 })
709 .await
710 }
711
712 pub async fn get_payload_bodies_by_range_v2_metered(
714 &self,
715 start: BlockNumber,
716 count: u64,
717 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
718 let start_time = Instant::now();
719 let res = Self::get_payload_bodies_by_range_v2(self, start, count).await;
720 self.inner.metrics.latency.get_payload_bodies_by_range_v2.record(start_time.elapsed());
721 res
722 }
723
724 pub async fn get_payload_bodies_by_hash_with<F, R>(
726 &self,
727 hashes: Vec<BlockHash>,
728 f: F,
729 ) -> EngineApiResult<Vec<Option<R>>>
730 where
731 F: Fn(Provider::Block) -> R + Send + 'static,
732 R: Send + 'static,
733 {
734 let len = hashes.len() as u64;
735 if len > MAX_PAYLOAD_BODIES_LIMIT {
736 return Err(EngineApiError::PayloadRequestTooLarge { len });
737 }
738
739 let (tx, rx) = oneshot::channel();
740 let inner = self.inner.clone();
741
742 self.inner.task_spawner.spawn_blocking_task(async move {
743 let mut result = Vec::with_capacity(hashes.len());
744 for hash in hashes {
745 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
746 match block_result {
747 Ok(block) => {
748 result.push(block.map(&f));
749 }
750 Err(err) => {
751 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
752 return;
753 }
754 }
755 }
756 tx.send(Ok(result)).ok();
757 });
758
759 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
760 }
761
762 async fn get_block_access_lists_by_hashes(
763 &self,
764 hashes: Vec<BlockHash>,
765 ) -> EngineApiResult<Vec<Option<Bytes>>> {
766 let len = hashes.len() as u64;
767 if len > MAX_PAYLOAD_BODIES_LIMIT {
768 return Err(EngineApiError::PayloadRequestTooLarge { len });
769 }
770
771 let (tx, rx) = oneshot::channel();
772 let bal_store = self.inner.provider.bal_store().clone();
773
774 self.inner.task_spawner.spawn_blocking_task(async move {
775 tx.send(
776 bal_store
777 .get_by_hashes(&hashes)
778 .map_err(|err| EngineApiError::Internal(Box::new(err))),
779 )
780 .ok();
781 });
782
783 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
784 }
785
786 pub async fn get_payload_bodies_by_hash_v1(
788 &self,
789 hashes: Vec<BlockHash>,
790 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
791 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
792 transactions: block.body().encoded_2718_transactions(),
793 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
794 })
795 .await
796 }
797
798 pub async fn get_payload_bodies_by_hash_v1_metered(
800 &self,
801 hashes: Vec<BlockHash>,
802 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
803 let start = Instant::now();
804 let res = Self::get_payload_bodies_by_hash_v1(self, hashes).await;
805 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
806 res
807 }
808
809 pub async fn get_payload_bodies_by_hash_v2(
813 &self,
814 hashes: Vec<BlockHash>,
815 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
816 let payload_bodies =
817 self.get_payload_bodies_by_hash_with(hashes.clone(), |block| ExecutionPayloadBodyV2 {
818 transactions: block.body().encoded_2718_transactions(),
819 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
820 block_access_list: None,
821 });
822 let block_access_lists = self.get_block_access_lists_by_hashes(hashes);
823 let (mut payload_bodies, block_access_lists) =
824 tokio::try_join!(payload_bodies, block_access_lists)?;
825
826 for (payload_body, block_access_list) in payload_bodies.iter_mut().zip(block_access_lists) {
827 if let Some(payload_body) = payload_body {
828 payload_body.block_access_list = block_access_list;
829 }
830 }
831
832 Ok(payload_bodies)
833 }
834
835 pub async fn get_payload_bodies_by_hash_v2_metered(
837 &self,
838 hashes: Vec<BlockHash>,
839 ) -> EngineApiResult<ExecutionPayloadBodiesV2> {
840 let start = Instant::now();
841 let res = Self::get_payload_bodies_by_hash_v2(self, hashes).await;
842 self.inner.metrics.latency.get_payload_bodies_by_hash_v2.record(start.elapsed());
843 res
844 }
845
846 async fn validate_and_execute_forkchoice(
860 &self,
861 version: EngineApiMessageVersion,
862 state: ForkchoiceState,
863 payload_attrs: Option<EngineT::PayloadAttributes>,
864 ) -> EngineApiResult<ForkchoiceUpdated> {
865 if let Some(ref attrs) = payload_attrs {
866 let attr_validation_res =
867 self.inner.validator.ensure_well_formed_attributes(version, attrs);
868
869 if let Err(err) = attr_validation_res {
879 let fcu_res = self.inner.beacon_consensus.fork_choice_updated(state, None).await?;
880 if fcu_res.is_invalid() || fcu_res.payload_status.is_syncing() {
881 return Ok(fcu_res)
882 }
883 return Err(err.into())
884 }
885 }
886
887 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs).await?)
888 }
889
890 pub fn capabilities(&self) -> &EngineCapabilities {
892 &self.inner.capabilities
893 }
894
895 fn get_blobs_v1(
896 &self,
897 versioned_hashes: Vec<B256>,
898 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
899 let current_timestamp =
901 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
902 if self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
903 return Err(EngineApiError::EngineObjectValidationError(
904 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
905 ));
906 }
907
908 if versioned_hashes.len() > MAX_BLOB_LIMIT {
909 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
910 }
911
912 self.inner
913 .tx_pool
914 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
915 .map_err(|err| EngineApiError::Internal(Box::new(err)))
916 }
917
918 pub fn get_blobs_v1_metered(
920 &self,
921 versioned_hashes: Vec<B256>,
922 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
923 let hashes_len = versioned_hashes.len();
924 let start = Instant::now();
925 let res = Self::get_blobs_v1(self, versioned_hashes);
926 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
927
928 if let Ok(blobs) = &res {
929 let blobs_found = blobs.iter().flatten().count();
930 let blobs_missed = hashes_len - blobs_found;
931
932 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
933 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
934 }
935
936 res
937 }
938
939 fn get_blobs_v2(
940 &self,
941 versioned_hashes: Vec<B256>,
942 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
943 let current_timestamp =
945 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
946 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
947 return Err(EngineApiError::EngineObjectValidationError(
948 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
949 ));
950 }
951
952 if versioned_hashes.len() > MAX_BLOB_LIMIT {
953 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
954 }
955
956 self.inner
957 .tx_pool
958 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
959 .map_err(|err| EngineApiError::Internal(Box::new(err)))
960 }
961
962 fn get_blobs_v3(
963 &self,
964 versioned_hashes: Vec<B256>,
965 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
966 let current_timestamp =
968 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
969 if !self.inner.chain_spec.is_osaka_active_at_timestamp(current_timestamp) {
970 return Err(EngineApiError::EngineObjectValidationError(
971 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
972 ));
973 }
974
975 if versioned_hashes.len() > MAX_BLOB_LIMIT {
976 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
977 }
978
979 if (*self.inner.is_syncing)() {
981 return Ok(None)
982 }
983
984 self.inner
985 .tx_pool
986 .get_blobs_for_versioned_hashes_v3(&versioned_hashes)
987 .map(Some)
988 .map_err(|err| EngineApiError::Internal(Box::new(err)))
989 }
990
991 fn get_blobs_v4(
992 &self,
993 versioned_hashes: Vec<B256>,
994 indices_bitarray: B128,
995 ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
996 let current_timestamp =
997 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs();
998 if !self.inner.chain_spec.is_amsterdam_active_at_timestamp(current_timestamp) {
999 return Err(EngineApiError::EngineObjectValidationError(
1000 reth_payload_primitives::EngineObjectValidationError::UnsupportedFork,
1001 ));
1002 }
1003
1004 if versioned_hashes.len() > MAX_BLOB_LIMIT {
1005 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
1006 }
1007
1008 if (*self.inner.is_syncing)() {
1010 return Ok(None)
1011 }
1012
1013 self.inner
1014 .tx_pool
1015 .get_blobs_for_versioned_hashes_v4(&versioned_hashes, indices_bitarray)
1016 .map(Some)
1017 .map_err(|err| EngineApiError::Internal(Box::new(err)))
1018 }
1019
1020 pub fn get_blobs_v2_metered(
1022 &self,
1023 versioned_hashes: Vec<B256>,
1024 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
1025 let hashes_len = versioned_hashes.len();
1026 let start = Instant::now();
1027 let res = Self::get_blobs_v2(self, versioned_hashes);
1028 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
1029
1030 if let Ok(blobs) = &res {
1031 let blobs_found = blobs.iter().flatten().count();
1032
1033 self.inner
1034 .metrics
1035 .blob_metrics
1036 .get_blobs_requests_blobs_total
1037 .increment(hashes_len as u64);
1038 self.inner
1039 .metrics
1040 .blob_metrics
1041 .get_blobs_requests_blobs_in_blobpool_total
1042 .increment(blobs_found as u64);
1043
1044 if blobs_found == hashes_len {
1045 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
1046 } else {
1047 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1048 }
1049 } else {
1050 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
1051 }
1052
1053 res
1054 }
1055
1056 pub fn get_blobs_v3_metered(
1058 &self,
1059 versioned_hashes: Vec<B256>,
1060 ) -> EngineApiResult<Option<Vec<Option<BlobAndProofV2>>>> {
1061 let hashes_len = versioned_hashes.len();
1062 let start = Instant::now();
1063 let res = Self::get_blobs_v3(self, versioned_hashes);
1064 self.inner.metrics.latency.get_blobs_v3.record(start.elapsed());
1065
1066 if let Ok(Some(blobs)) = &res {
1067 let blobs_found = blobs.iter().flatten().count();
1068 let blobs_missed = hashes_len - blobs_found;
1069
1070 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1071 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1072 }
1073
1074 res
1075 }
1076
1077 pub fn get_blobs_v4_metered(
1079 &self,
1080 versioned_hashes: Vec<B256>,
1081 indices_bitarray: B128,
1082 ) -> EngineApiResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1083 let hashes_len = versioned_hashes.len();
1084 let start = Instant::now();
1085 let res = Self::get_blobs_v4(self, versioned_hashes, indices_bitarray);
1086 self.inner.metrics.latency.get_blobs_v4.record(start.elapsed());
1087
1088 if let Ok(Some(blobs)) = &res {
1089 let blobs_found = blobs.iter().flatten().count();
1090 let blobs_missed = hashes_len - blobs_found;
1091
1092 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
1093 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
1094 }
1095
1096 res
1097 }
1098}
1099
1100#[async_trait]
1102impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
1103 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1104where
1105 Provider: HeaderProvider + BlockReader + StateProviderFactory + BalProvider + 'static,
1106 EngineT: EngineTypes<ExecutionData = ExecutionData>,
1107 Pool: TransactionPool + 'static,
1108 Validator: EngineApiValidator<EngineT>,
1109 ChainSpec: EthereumHardforks + Send + Sync + 'static,
1110{
1111 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
1115 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
1116 let payload =
1117 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
1118 Ok(self.new_payload_v1_metered(payload).await?)
1119 }
1120
1121 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
1124 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
1125 let payload = ExecutionData {
1126 payload: payload.into_payload(),
1127 sidecar: ExecutionPayloadSidecar::none(),
1128 };
1129
1130 Ok(self.new_payload_v2_metered(payload).await?)
1131 }
1132
1133 async fn new_payload_v3(
1136 &self,
1137 payload: ExecutionPayloadV3,
1138 versioned_hashes: Vec<B256>,
1139 parent_beacon_block_root: B256,
1140 ) -> RpcResult<PayloadStatus> {
1141 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
1142 let payload = ExecutionData {
1143 payload: payload.into(),
1144 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
1145 versioned_hashes,
1146 parent_beacon_block_root,
1147 }),
1148 };
1149
1150 Ok(self.new_payload_v3_metered(payload).await?)
1151 }
1152
1153 async fn new_payload_v4(
1156 &self,
1157 payload: ExecutionPayloadV3,
1158 versioned_hashes: Vec<B256>,
1159 parent_beacon_block_root: B256,
1160 requests: RequestsOrHash,
1161 ) -> RpcResult<PayloadStatus> {
1162 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
1163
1164 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1166 return Err(EngineApiError::UnexpectedRequestsHash.into());
1167 }
1168
1169 let payload = ExecutionData {
1170 payload: payload.into(),
1171 sidecar: ExecutionPayloadSidecar::v4(
1172 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1173 PraguePayloadFields { requests },
1174 ),
1175 };
1176
1177 Ok(self.new_payload_v4_metered(payload).await?)
1178 }
1179
1180 async fn new_payload_v5(
1186 &self,
1187 payload: ExecutionPayloadV4,
1188 versioned_hashes: Vec<B256>,
1189 parent_beacon_block_root: B256,
1190 requests: RequestsOrHash,
1191 ) -> RpcResult<PayloadStatus> {
1192 trace!(target: "rpc::engine", "Serving engine_newPayloadV5");
1193 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
1195 return Err(EngineApiError::UnexpectedRequestsHash.into());
1196 }
1197
1198 let payload = ExecutionData {
1199 payload: payload.into(),
1200 sidecar: ExecutionPayloadSidecar::v4(
1201 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
1202 PraguePayloadFields { requests },
1203 ),
1204 };
1205
1206 Ok(self.new_payload_v5_metered(payload).await?)
1207 }
1208
1209 async fn fork_choice_updated_v1(
1214 &self,
1215 fork_choice_state: ForkchoiceState,
1216 payload_attributes: Option<EngineT::PayloadAttributes>,
1217 ) -> RpcResult<ForkchoiceUpdated> {
1218 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
1219 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
1220 }
1221
1222 async fn fork_choice_updated_v2(
1225 &self,
1226 fork_choice_state: ForkchoiceState,
1227 payload_attributes: Option<EngineT::PayloadAttributes>,
1228 ) -> RpcResult<ForkchoiceUpdated> {
1229 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
1230 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
1231 }
1232
1233 async fn fork_choice_updated_v3(
1237 &self,
1238 fork_choice_state: ForkchoiceState,
1239 payload_attributes: Option<EngineT::PayloadAttributes>,
1240 ) -> RpcResult<ForkchoiceUpdated> {
1241 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
1242 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
1243 }
1244
1245 async fn fork_choice_updated_v4(
1249 &self,
1250 fork_choice_state: ForkchoiceState,
1251 payload_attributes: Option<EngineT::PayloadAttributes>,
1252 ) -> RpcResult<ForkchoiceUpdated> {
1253 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV4");
1254 Ok(self.fork_choice_updated_v4_metered(fork_choice_state, payload_attributes).await?)
1255 }
1256
1257 async fn get_payload_v1(
1269 &self,
1270 payload_id: PayloadId,
1271 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1272 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1273 Ok(self.get_payload_v1_metered(payload_id).await?)
1274 }
1275
1276 async fn get_payload_v2(
1286 &self,
1287 payload_id: PayloadId,
1288 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1289 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1290 Ok(self.get_payload_v2_metered(payload_id).await?)
1291 }
1292
1293 async fn get_payload_v3(
1303 &self,
1304 payload_id: PayloadId,
1305 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1306 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1307 Ok(self.get_payload_v3_metered(payload_id).await?)
1308 }
1309
1310 async fn get_payload_v4(
1320 &self,
1321 payload_id: PayloadId,
1322 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1323 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1324 Ok(self.get_payload_v4_metered(payload_id).await?)
1325 }
1326
1327 async fn get_payload_v5(
1337 &self,
1338 payload_id: PayloadId,
1339 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1340 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1341 Ok(self.get_payload_v5_metered(payload_id).await?)
1342 }
1343
1344 async fn get_payload_v6(
1350 &self,
1351 payload_id: PayloadId,
1352 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV6> {
1353 trace!(target: "rpc::engine", "Serving engine_getPayloadV6");
1354 Ok(self.get_payload_v6_metered(payload_id).await?)
1355 }
1356
1357 async fn get_payload_bodies_by_hash_v1(
1360 &self,
1361 block_hashes: Vec<BlockHash>,
1362 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1363 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1364 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1365 }
1366
1367 async fn get_payload_bodies_by_hash_v2(
1373 &self,
1374 block_hashes: Vec<BlockHash>,
1375 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1376 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV2");
1377 Ok(self.get_payload_bodies_by_hash_v2_metered(block_hashes).await?)
1378 }
1379
1380 async fn get_payload_bodies_by_range_v1(
1397 &self,
1398 start: U64,
1399 count: U64,
1400 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1401 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1402 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1403 }
1404
1405 async fn get_payload_bodies_by_range_v2(
1411 &self,
1412 start: U64,
1413 count: U64,
1414 ) -> RpcResult<ExecutionPayloadBodiesV2> {
1415 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV2");
1416 Ok(self.get_payload_bodies_by_range_v2_metered(start.to(), count.to()).await?)
1417 }
1418
1419 async fn get_client_version_v1(
1423 &self,
1424 client: ClientVersionV1,
1425 ) -> RpcResult<Vec<ClientVersionV1>> {
1426 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1427 Ok(Self::get_client_version_v1(self, client)?)
1428 }
1429
1430 async fn exchange_capabilities(&self, capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1433 trace!(target: "rpc::engine", "Serving engine_exchangeCapabilities");
1434
1435 let el_caps = self.capabilities();
1436 el_caps.log_capability_mismatches(&capabilities);
1437
1438 Ok(el_caps.list())
1439 }
1440
1441 async fn get_blobs_v1(
1442 &self,
1443 versioned_hashes: Vec<B256>,
1444 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1445 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1446 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1447 }
1448
1449 async fn get_blobs_v2(
1450 &self,
1451 versioned_hashes: Vec<B256>,
1452 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1453 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1454 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1455 }
1456
1457 async fn get_blobs_v3(
1458 &self,
1459 versioned_hashes: Vec<B256>,
1460 ) -> RpcResult<Option<Vec<Option<BlobAndProofV2>>>> {
1461 trace!(target: "rpc::engine", "Serving engine_getBlobsV3");
1462 Ok(self.get_blobs_v3_metered(versioned_hashes)?)
1463 }
1464
1465 async fn get_blobs_v4(
1466 &self,
1467 versioned_hashes: Vec<B256>,
1468 indices_bitarray: B128,
1469 ) -> RpcResult<Option<Vec<Option<BlobCellsAndProofsV1>>>> {
1470 trace!(target: "rpc::engine", "Serving engine_getBlobsV4");
1471 Ok(self.get_blobs_v4_metered(versioned_hashes, indices_bitarray)?)
1472 }
1473}
1474
1475impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1476 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1477where
1478 EngineT: EngineTypes,
1479 Self: EngineApiServer<EngineT>,
1480{
1481 fn into_rpc_module(self) -> RpcModule<()> {
1482 EngineApiServer::<EngineT>::into_rpc(self).remove_context()
1483 }
1484}
1485
1486impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1487 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1488where
1489 PayloadT: PayloadTypes,
1490{
1491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1492 f.debug_struct("EngineApi").finish_non_exhaustive()
1493 }
1494}
1495
1496impl<Provider, PayloadT, Pool, Validator, ChainSpec> Clone
1497 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1498where
1499 PayloadT: PayloadTypes,
1500{
1501 fn clone(&self) -> Self {
1502 Self { inner: Arc::clone(&self.inner) }
1503 }
1504}
1505
1506struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
1508 provider: Provider,
1510 chain_spec: Arc<ChainSpec>,
1512 beacon_consensus: ConsensusEngineHandle<PayloadT>,
1514 payload_store: PayloadStore<PayloadT>,
1516 task_spawner: Runtime,
1518 metrics: EngineApiMetrics,
1520 client: ClientVersionV1,
1522 capabilities: EngineCapabilities,
1524 tx_pool: Pool,
1526 validator: Validator,
1528 accept_execution_requests_hash: bool,
1529 is_syncing: Arc<dyn Fn() -> bool + Send + Sync>,
1531}
1532
1533#[cfg(test)]
1534mod tests {
1535 use super::*;
1536 use alloy_eips::{eip7685::Requests, NumHash};
1537 use alloy_primitives::{keccak256, Address, Bytes, Sealed, B256};
1538 use alloy_rpc_types_engine::{
1539 ClientCode, ClientVersionV1, ExecutionPayloadV2, PayloadAttributes, PayloadStatusEnum,
1540 };
1541 use assert_matches::assert_matches;
1542 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
1543 use reth_engine_primitives::{BeaconEngineMessage, OnForkChoiceUpdated};
1544 use reth_ethereum_engine_primitives::EthEngineTypes;
1545 use reth_ethereum_primitives::Block;
1546 use reth_network_api::{
1547 noop::NoopNetwork, EthProtocolInfo, NetworkError, NetworkInfo, NetworkStatus,
1548 };
1549 use reth_node_ethereum::EthereumEngineValidator;
1550 use reth_payload_builder::test_utils::spawn_test_payload_service;
1551 use reth_provider::{test_utils::MockEthProvider, BalStoreHandle, InMemoryBalStore};
1552 use reth_tasks::Runtime;
1553 use reth_transaction_pool::noop::NoopTransactionPool;
1554 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1555
1556 fn setup_engine_api() -> (
1557 EngineApiTestHandle,
1558 EngineApi<
1559 Arc<MockEthProvider>,
1560 EthEngineTypes,
1561 NoopTransactionPool,
1562 EthereumEngineValidator,
1563 ChainSpec,
1564 >,
1565 ) {
1566 let client = ClientVersionV1 {
1567 code: ClientCode::RH,
1568 name: "Reth".to_string(),
1569 version: "v0.2.0-beta.5".to_string(),
1570 commit: "defa64b2".to_string(),
1571 };
1572
1573 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1574 let provider = Arc::new(MockEthProvider::default());
1575 let payload_store = spawn_test_payload_service();
1576 let (to_engine, engine_rx) = unbounded_channel();
1577 let task_executor = Runtime::test();
1578 let api = EngineApi::new(
1579 provider.clone(),
1580 chain_spec.clone(),
1581 ConsensusEngineHandle::new(to_engine),
1582 payload_store.into(),
1583 NoopTransactionPool::default(),
1584 task_executor,
1585 client,
1586 EngineCapabilities::default(),
1587 EthereumEngineValidator::new(chain_spec.clone()),
1588 false,
1589 NoopNetwork::default(),
1590 );
1591 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1592 (handle, api)
1593 }
1594
1595 #[tokio::test]
1596 async fn engine_client_version_v1() {
1597 let client = ClientVersionV1 {
1598 code: ClientCode::RH,
1599 name: "Reth".to_string(),
1600 version: "v0.2.0-beta.5".to_string(),
1601 commit: "defa64b2".to_string(),
1602 };
1603 let (_, api) = setup_engine_api();
1604 let res = api.get_client_version_v1(client.clone());
1605 assert_eq!(res.unwrap(), vec![client]);
1606 }
1607
1608 #[tokio::test]
1609 async fn get_payload_bodies_by_hash_v2_returns_block_access_list_from_store() {
1610 let bal_store = BalStoreHandle::new(InMemoryBalStore::default());
1611 let mut provider = MockEthProvider::default();
1612 provider.bal_store = bal_store.clone();
1613 let provider = Arc::new(provider);
1614
1615 let client = ClientVersionV1 {
1616 code: ClientCode::RH,
1617 name: "Reth".to_string(),
1618 version: "v0.2.0-beta.5".to_string(),
1619 commit: "defa64b2".to_string(),
1620 };
1621 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1622 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1623 let (to_engine, _engine_rx) = unbounded_channel();
1624 let api = EngineApi::new(
1625 provider.clone(),
1626 chain_spec.clone(),
1627 ConsensusEngineHandle::new(to_engine),
1628 payload_store.into(),
1629 NoopTransactionPool::default(),
1630 Runtime::test(),
1631 client,
1632 EngineCapabilities::default(),
1633 EthereumEngineValidator::new(chain_spec),
1634 false,
1635 NoopNetwork::default(),
1636 );
1637
1638 let mut block = Block::default();
1639 block.header.number = 1;
1640 let block_hash = block.header.hash_slow();
1641 provider.add_block(block_hash, block);
1642
1643 let mut block_without_bal = Block::default();
1644 block_without_bal.header.number = 2;
1645 let block_without_bal_hash = block_without_bal.header.hash_slow();
1646 provider.add_block(block_without_bal_hash, block_without_bal);
1647
1648 let raw_bal = Bytes::from_static(&[alloy_rlp::EMPTY_LIST_CODE]);
1649 let sealed_bal = Sealed::new_unchecked(raw_bal.clone(), keccak256(&raw_bal));
1650 bal_store.insert(NumHash::new(1, block_hash), sealed_bal).unwrap();
1651
1652 let missing_hash = B256::with_last_byte(3);
1653 let response = api
1654 .get_payload_bodies_by_hash_v2(vec![block_hash, block_without_bal_hash, missing_hash])
1655 .await
1656 .unwrap();
1657
1658 assert_eq!(response.len(), 3);
1659 assert_eq!(response[0].as_ref().unwrap().block_access_list, Some(raw_bal));
1660 assert_eq!(response[1].as_ref().unwrap().block_access_list, None);
1661 assert!(response[2].is_none());
1662 }
1663
1664 struct EngineApiTestHandle {
1665 #[allow(dead_code)]
1666 chain_spec: Arc<ChainSpec>,
1667 provider: Arc<MockEthProvider>,
1668 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1669 }
1670
1671 #[tokio::test]
1672 async fn forwards_responses_to_consensus_engine() {
1673 let (mut handle, api) = setup_engine_api();
1674
1675 tokio::spawn(async move {
1676 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1677 let execution_data = ExecutionData {
1678 payload: payload_v1.into(),
1679 sidecar: ExecutionPayloadSidecar::none(),
1680 };
1681
1682 api.new_payload_v1(execution_data).await.unwrap();
1683 });
1684 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1685 }
1686
1687 #[tokio::test]
1688 async fn new_payload_v5_accepts_amsterdam_payloads() {
1689 let chain_spec = Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1690 let provider = Arc::new(MockEthProvider::default());
1691 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1692 let (to_engine, mut engine_rx) = unbounded_channel();
1693
1694 let api = EngineApi::new(
1695 provider,
1696 chain_spec.clone(),
1697 ConsensusEngineHandle::new(to_engine),
1698 payload_store.into(),
1699 NoopTransactionPool::default(),
1700 Runtime::test(),
1701 ClientVersionV1 {
1702 code: ClientCode::RH,
1703 name: "Reth".to_string(),
1704 version: "v0.0.0-test".to_string(),
1705 commit: "test".to_string(),
1706 },
1707 EngineCapabilities::default(),
1708 EthereumEngineValidator::new(chain_spec),
1709 false,
1710 NoopNetwork::default(),
1711 );
1712
1713 tokio::spawn(async move {
1714 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1715 let payload = ExecutionPayloadV4 {
1716 payload_inner: ExecutionPayloadV3 {
1717 payload_inner: ExecutionPayloadV2 {
1718 payload_inner: payload_v1,
1719 withdrawals: Vec::new(),
1720 },
1721 blob_gas_used: 0,
1722 excess_blob_gas: 0,
1723 },
1724 block_access_list: Bytes::from_static(b"bal"),
1725 slot_number: 1,
1726 };
1727 let execution_data = ExecutionData {
1728 payload: payload.into(),
1729 sidecar: ExecutionPayloadSidecar::v4(
1730 CancunPayloadFields {
1731 versioned_hashes: Vec::new(),
1732 parent_beacon_block_root: B256::ZERO,
1733 },
1734 PraguePayloadFields { requests: RequestsOrHash::Requests(Requests::default()) },
1735 ),
1736 };
1737
1738 api.new_payload_v5(execution_data).await.unwrap();
1739 });
1740
1741 assert_matches!(engine_rx.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1742 }
1743
1744 #[derive(Clone)]
1745 struct TestNetworkInfo {
1746 syncing: bool,
1747 }
1748
1749 impl NetworkInfo for TestNetworkInfo {
1750 fn local_addr(&self) -> std::net::SocketAddr {
1751 (std::net::Ipv4Addr::UNSPECIFIED, 0).into()
1752 }
1753
1754 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
1755 #[allow(deprecated)]
1756 Ok(NetworkStatus {
1757 client_version: "test".to_string(),
1758 protocol_version: 5,
1759 eth_protocol_info: EthProtocolInfo {
1760 network: 1,
1761 difficulty: None,
1762 genesis: Default::default(),
1763 config: Default::default(),
1764 head: Default::default(),
1765 },
1766 capabilities: vec![],
1767 })
1768 }
1769
1770 fn chain_id(&self) -> u64 {
1771 1
1772 }
1773
1774 fn is_syncing(&self) -> bool {
1775 self.syncing
1776 }
1777
1778 fn is_initially_syncing(&self) -> bool {
1779 self.syncing
1780 }
1781 }
1782
1783 #[tokio::test]
1784 async fn get_blobs_v3_returns_null_when_syncing() {
1785 let chain_spec: Arc<ChainSpec> =
1786 Arc::new(ChainSpecBuilder::mainnet().osaka_activated().build());
1787 let provider = Arc::new(MockEthProvider::default());
1788 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1789 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1790
1791 let api = EngineApi::new(
1792 provider,
1793 chain_spec.clone(),
1794 ConsensusEngineHandle::new(to_engine),
1795 payload_store.into(),
1796 NoopTransactionPool::default(),
1797 Runtime::test(),
1798 ClientVersionV1 {
1799 code: ClientCode::RH,
1800 name: "Reth".to_string(),
1801 version: "v0.0.0-test".to_string(),
1802 commit: "test".to_string(),
1803 },
1804 EngineCapabilities::default(),
1805 EthereumEngineValidator::new(chain_spec),
1806 false,
1807 TestNetworkInfo { syncing: true },
1808 );
1809
1810 let res = api.get_blobs_v3_metered(vec![B256::ZERO]);
1811 assert_matches!(res, Ok(None));
1812 }
1813
1814 #[tokio::test]
1815 async fn get_blobs_v4_returns_null_when_syncing() {
1816 let chain_spec: Arc<ChainSpec> =
1817 Arc::new(ChainSpecBuilder::mainnet().amsterdam_activated().build());
1818 let provider = Arc::new(MockEthProvider::default());
1819 let payload_store = spawn_test_payload_service::<EthEngineTypes>();
1820 let (to_engine, _engine_rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
1821
1822 let api = EngineApi::new(
1823 provider,
1824 chain_spec.clone(),
1825 ConsensusEngineHandle::new(to_engine),
1826 payload_store.into(),
1827 NoopTransactionPool::default(),
1828 Runtime::test(),
1829 ClientVersionV1 {
1830 code: ClientCode::RH,
1831 name: "Reth".to_string(),
1832 version: "v0.0.0-test".to_string(),
1833 commit: "test".to_string(),
1834 },
1835 EngineCapabilities::default(),
1836 EthereumEngineValidator::new(chain_spec),
1837 false,
1838 TestNetworkInfo { syncing: true },
1839 );
1840
1841 let res = api.get_blobs_v4_metered(vec![B256::ZERO], B128::from(1u128));
1842 assert_matches!(res, Ok(None));
1843 }
1844
1845 #[tokio::test]
1846 async fn fcu_v3_syncing_precedes_invalid_payload_attributes_validation() {
1847 let (mut handle, api) = setup_engine_api();
1848
1849 let state = ForkchoiceState {
1850 head_block_hash: B256::from([0x11; 32]),
1851 safe_block_hash: B256::ZERO,
1852 finalized_block_hash: B256::ZERO,
1853 };
1854 let payload_attributes = PayloadAttributes {
1855 timestamp: 1,
1856 prev_randao: B256::ZERO,
1857 suggested_fee_recipient: Address::ZERO,
1858 withdrawals: Some(vec![]),
1859 parent_beacon_block_root: None,
1861 slot_number: None,
1862 };
1863
1864 let api_task = tokio::spawn(async move {
1865 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1866 });
1867
1868 let request =
1869 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1870 .await
1871 .expect("timed out waiting for forkchoiceUpdated request")
1872 .expect("expected forkchoiceUpdated request");
1873 let response_tx = match request {
1874 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1875 assert!(
1876 payload_attrs.is_none(),
1877 "FCU for syncing state should be evaluated before payload attributes"
1878 );
1879 tx
1880 }
1881 other => panic!("unexpected engine message: {other:?}"),
1882 };
1883
1884 response_tx.send(Ok(OnForkChoiceUpdated::syncing())).expect("send syncing response");
1885
1886 let response = api_task
1887 .await
1888 .expect("api task should not panic")
1889 .expect("forkchoiceUpdatedV3 should return a syncing response");
1890 assert!(response.payload_status.is_syncing());
1891 assert!(response.payload_id.is_none());
1892 }
1893
1894 #[tokio::test]
1895 async fn fcu_v3_valid_forkchoice_missing_beacon_root_returns_invalid_attributes() {
1896 let (mut handle, api) = setup_engine_api();
1897
1898 let state = ForkchoiceState {
1899 head_block_hash: B256::from([0x22; 32]),
1900 safe_block_hash: B256::ZERO,
1901 finalized_block_hash: B256::ZERO,
1902 };
1903 let payload_attributes = PayloadAttributes {
1904 timestamp: 1,
1905 prev_randao: B256::ZERO,
1906 suggested_fee_recipient: Address::ZERO,
1907 withdrawals: Some(vec![]),
1908 parent_beacon_block_root: None,
1909 slot_number: None,
1910 };
1911
1912 let api_task = tokio::spawn(async move {
1913 api.fork_choice_updated_v3(state, Some(payload_attributes)).await
1914 });
1915
1916 let request =
1917 tokio::time::timeout(std::time::Duration::from_secs(1), handle.from_api.recv())
1918 .await
1919 .expect("timed out waiting for forkchoiceUpdated request")
1920 .expect("expected forkchoiceUpdated request");
1921
1922 let response_tx = match request {
1923 BeaconEngineMessage::ForkchoiceUpdated { payload_attrs, tx, .. } => {
1924 assert!(
1925 payload_attrs.is_none(),
1926 "when attrs are invalid, API should first evaluate forkchoice without attrs"
1927 );
1928 tx
1929 }
1930 other => panic!("unexpected engine message: {other:?}"),
1931 };
1932
1933 response_tx
1934 .send(Ok(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1935 PayloadStatusEnum::Valid,
1936 ))))
1937 .expect("send valid response");
1938
1939 let response = api_task.await.expect("api task should not panic");
1940 assert_matches!(
1941 response,
1942 Err(EngineApiError::EngineObjectValidationError(
1943 reth_payload_primitives::EngineObjectValidationError::PayloadAttributes(_)
1944 ))
1945 );
1946
1947 match tokio::time::timeout(std::time::Duration::from_millis(100), handle.from_api.recv())
1948 .await
1949 {
1950 Err(_) | Ok(None) => {}
1951 Ok(Some(BeaconEngineMessage::ForkchoiceUpdated { .. })) => {
1952 panic!("no second forkchoiceUpdated call should be sent when attrs are invalid")
1953 }
1954 Ok(Some(other)) => panic!("unexpected engine message: {other:?}"),
1955 }
1956 }
1957
1958 mod get_payload_bodies {
1960 use super::*;
1961 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1962 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1963
1964 #[tokio::test]
1965 async fn invalid_params() {
1966 let (_, api) = setup_engine_api();
1967
1968 let by_range_tests = [
1969 (0, 0),
1971 (0, 1),
1972 (1, 0),
1973 ];
1974
1975 for (start, count) in by_range_tests {
1977 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1978 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1979 }
1980 }
1981
1982 #[tokio::test]
1983 async fn request_too_large() {
1984 let (_, api) = setup_engine_api();
1985
1986 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1987 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1988 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1989 }
1990
1991 #[tokio::test]
1992 async fn returns_payload_bodies() {
1993 let mut rng = generators::rng();
1994 let (handle, api) = setup_engine_api();
1995
1996 let (start, count) = (1, 10);
1997 let blocks = random_block_range(
1998 &mut rng,
1999 start..=start + count - 1,
2000 BlockRangeParams { tx_count: 0..2, ..Default::default() },
2001 );
2002 handle
2003 .provider
2004 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
2005
2006 let expected = blocks
2007 .iter()
2008 .cloned()
2009 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
2010 .collect::<Vec<_>>();
2011
2012 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2013 assert_eq!(res, expected);
2014 }
2015
2016 #[tokio::test]
2017 async fn returns_payload_bodies_with_gaps() {
2018 let mut rng = generators::rng();
2019 let (handle, api) = setup_engine_api();
2020
2021 let (start, count) = (1, 100);
2022 let blocks = random_block_range(
2023 &mut rng,
2024 start..=start + count - 1,
2025 BlockRangeParams { tx_count: 0..2, ..Default::default() },
2026 );
2027
2028 let first_missing_range = 26..=50;
2030 let second_missing_range = 76..=100;
2031 handle.provider.extend_blocks(
2032 blocks
2033 .iter()
2034 .filter(|b| {
2035 !first_missing_range.contains(&b.number) &&
2036 !second_missing_range.contains(&b.number)
2037 })
2038 .map(|b| (b.hash(), b.clone().into_block())),
2039 );
2040
2041 let expected = blocks
2042 .iter()
2043 .filter(|b| !second_missing_range.contains(&b.number))
2046 .cloned()
2047 .map(|b| {
2048 if first_missing_range.contains(&b.number) {
2049 None
2050 } else {
2051 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2052 }
2053 })
2054 .collect::<Vec<_>>();
2055
2056 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
2057 assert_eq!(res, expected);
2058
2059 let expected = blocks
2060 .iter()
2061 .cloned()
2062 .map(|b| {
2065 if first_missing_range.contains(&b.number) ||
2066 second_missing_range.contains(&b.number)
2067 {
2068 None
2069 } else {
2070 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
2071 }
2072 })
2073 .collect::<Vec<_>>();
2074
2075 let hashes = blocks.iter().map(|b| b.hash()).collect();
2076 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
2077 assert_eq!(res, expected);
2078 }
2079 }
2080}