reth_rpc_engine_api/
reth_engine_api.rs1use crate::EngineApiError;
2use alloy_rlp::Decodable;
3use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated};
4use async_trait::async_trait;
5use jsonrpsee_core::RpcResult;
6use reth_engine_primitives::ConsensusEngineHandle;
7use reth_payload_primitives::PayloadTypes;
8use reth_primitives_traits::SealedBlock;
9use reth_rpc_api::{RethEngineApiServer, RethNewPayloadInput, RethPayloadStatus};
10use tracing::trace;
11
12#[derive(Debug)]
18pub struct RethEngineApi<Payload: PayloadTypes> {
19 beacon_engine_handle: ConsensusEngineHandle<Payload>,
20}
21
22impl<Payload: PayloadTypes> RethEngineApi<Payload> {
23 pub const fn new(beacon_engine_handle: ConsensusEngineHandle<Payload>) -> Self {
25 Self { beacon_engine_handle }
26 }
27}
28
29#[async_trait]
30impl<Payload: PayloadTypes> RethEngineApiServer<Payload::ExecutionData> for RethEngineApi<Payload> {
31 async fn reth_new_payload(
32 &self,
33 input: RethNewPayloadInput<Payload::ExecutionData>,
34 wait_for_persistence: Option<bool>,
35 wait_for_caches: Option<bool>,
36 ) -> RpcResult<RethPayloadStatus> {
37 let wait_for_persistence = wait_for_persistence.unwrap_or(true);
38 let wait_for_caches = wait_for_caches.unwrap_or(true);
39 trace!(target: "rpc::engine", wait_for_persistence, wait_for_caches, "Serving reth_newPayload");
40
41 let payload = match input {
42 RethNewPayloadInput::ExecutionData(data) => data,
43 RethNewPayloadInput::BlockRlp(rlp) => {
44 let block = Decodable::decode(&mut rlp.as_ref())
45 .map_err(|err| EngineApiError::Internal(Box::new(err)))?;
46 Payload::block_to_payload(SealedBlock::new_unhashed(block))
47 }
48 };
49
50 let (status, timings) = self
51 .beacon_engine_handle
52 .reth_new_payload(payload, wait_for_persistence, wait_for_caches)
53 .await
54 .map_err(EngineApiError::from)?;
55 Ok(RethPayloadStatus {
56 status,
57 latency_us: timings.latency.as_micros() as u64,
58 persistence_wait_us: timings.persistence_wait.map(|d| d.as_micros() as u64),
59 execution_cache_wait_us: timings.execution_cache_wait.map(|d| d.as_micros() as u64),
60 sparse_trie_wait_us: timings.sparse_trie_wait.map(|d| d.as_micros() as u64),
61 })
62 }
63
64 async fn reth_forkchoice_updated(
65 &self,
66 forkchoice_state: ForkchoiceState,
67 ) -> RpcResult<ForkchoiceUpdated> {
68 trace!(target: "rpc::engine", "Serving reth_forkchoiceUpdated");
69 self.beacon_engine_handle
70 .fork_choice_updated(forkchoice_state, None)
71 .await
72 .map_err(|e| EngineApiError::from(e).into())
73 }
74}