Skip to main content

reth_engine_primitives/
message.rs

1use crate::{
2    error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_eips::eip4895::Withdrawal;
5use alloy_primitives::{Bytes, B256};
6use alloy_rpc_types_engine::{
7    ExecutionData, ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError,
8    ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
9};
10use core::{
11    fmt::{self, Display},
12    future::Future,
13    pin::Pin,
14    task::{ready, Context, Poll},
15};
16use futures::{future::Either, FutureExt, TryFutureExt};
17use reth_errors::RethResult;
18use reth_payload_builder_primitives::PayloadBuilderError;
19use reth_payload_primitives::PayloadTypes;
20use std::time::{Duration, Instant};
21use tokio::sync::{mpsc::UnboundedSender, oneshot};
22
23/// Type alias for backwards compat
24#[deprecated(note = "Use ConsensusEngineHandle instead")]
25pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
26
27/// Represents the outcome of forkchoice update.
28///
29/// This is a future that resolves to [`ForkChoiceUpdateResult`]
30#[must_use = "futures do nothing unless you `.await` or poll them"]
31#[derive(Debug)]
32pub struct OnForkChoiceUpdated {
33    /// Represents the status of the forkchoice update.
34    ///
35    /// Note: This is separate from the response `fut`, because we still can return an error
36    /// depending on the payload attributes, even if the forkchoice update itself is valid.
37    forkchoice_status: ForkchoiceStatus,
38    /// Returns the result of the forkchoice update.
39    fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
40}
41
42// === impl OnForkChoiceUpdated ===
43
44impl OnForkChoiceUpdated {
45    /// Returns the determined status of the received `ForkchoiceState`.
46    pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
47        self.forkchoice_status
48    }
49
50    /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
51    pub fn syncing() -> Self {
52        let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
53        Self {
54            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
55            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
56        }
57    }
58
59    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
60    /// payload attributes were provided.
61    pub fn valid(status: PayloadStatus) -> Self {
62        Self {
63            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
64            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
65        }
66    }
67
68    /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
69    /// forkchoice update failed due to an invalid payload.
70    pub fn with_invalid(status: PayloadStatus) -> Self {
71        Self {
72            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
73            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
74        }
75    }
76
77    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
78    /// given state is considered invalid
79    pub fn invalid_state() -> Self {
80        Self {
81            forkchoice_status: ForkchoiceStatus::Invalid,
82            fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
83        }
84    }
85
86    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
87    /// payload attributes were invalid.
88    pub fn invalid_payload_attributes() -> Self {
89        Self {
90            // This is valid because this is only reachable if the state and payload is valid
91            forkchoice_status: ForkchoiceStatus::Valid,
92            fut: Either::Left(futures::future::ready(Err(
93                ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
94            ))),
95        }
96    }
97
98    /// If the forkchoice update was successful and no payload attributes were provided, this method
99    pub const fn updated_with_pending_payload_id(
100        payload_status: PayloadStatus,
101        pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
102    ) -> Self {
103        Self {
104            forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
105            fut: Either::Right(PendingPayloadId {
106                payload_status: Some(payload_status),
107                pending_payload_id,
108            }),
109        }
110    }
111}
112
113impl Future for OnForkChoiceUpdated {
114    type Output = ForkChoiceUpdateResult;
115
116    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117        self.get_mut().fut.poll_unpin(cx)
118    }
119}
120
121/// A future that returns the payload id of a yet to be initiated payload job after a successful
122/// forkchoice update
123#[derive(Debug)]
124struct PendingPayloadId {
125    payload_status: Option<PayloadStatus>,
126    pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
127}
128
129impl Future for PendingPayloadId {
130    type Output = ForkChoiceUpdateResult;
131
132    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        let this = self.get_mut();
134        let res = ready!(this.pending_payload_id.poll_unpin(cx));
135        match res {
136            Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
137                payload_status: this.payload_status.take().expect("Polled after completion"),
138                payload_id: Some(payload_id),
139            })),
140            Err(_) | Ok(Err(_)) => {
141                // failed to initiate a payload build job
142                Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
143            }
144        }
145    }
146}
147
148/// Timing breakdown for `reth_newPayload` responses.
149#[derive(Debug, Clone, Copy)]
150pub struct NewPayloadTimings {
151    /// Server-side execution latency.
152    pub latency: Duration,
153    /// Time spent waiting on persistence, including both time this message spent queued
154    /// due to persistence backpressure and, when `wait_for_persistence` was requested,
155    /// the explicit wait for in-flight persistence to complete.
156    pub persistence_wait: Duration,
157    /// Time spent waiting for the execution cache lock.
158    ///
159    /// `None` when wasn't asked to wait for execution cache.
160    pub execution_cache_wait: Option<Duration>,
161    /// Time spent waiting for the sparse trie cache lock.
162    ///
163    /// `None` when wasn't asked to wait for sparse trie cache.
164    pub sparse_trie_wait: Option<Duration>,
165}
166
167/// Additional data for big block payloads that merge multiple real blocks.
168///
169/// This is used by the `reth_newPayload` endpoint to pass environment switches
170/// and prior block hashes needed for correct multi-segment execution.
171#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
172pub struct BigBlockData<ExecutionData> {
173    /// Environment switches at block boundaries.
174    /// Each entry is `(cumulative_tx_count, execution_data_of_next_block)`.
175    ///
176    /// The first entry at index 0 represents the **original unmutated** base block's
177    /// `ExecutionData`, which must be used to derive the initial EVM environment.
178    pub env_switches: Vec<ExecutionData>,
179    /// Block number → real block hash for blocks covered by previous big blocks in a sequence.
180    /// When replaying chained big blocks, the BLOCKHASH opcode needs real hashes for blocks
181    /// that were merged into earlier big blocks (and thus not individually persisted).
182    pub prior_block_hashes: Vec<(u64, alloy_primitives::B256)>,
183    /// Block number for this big block.
184    pub block_number: u64,
185    /// Merged block access list for this big block.
186    #[serde(default, skip_serializing_if = "Option::is_none")]
187    pub merged_block_access_list: Option<Bytes>,
188}
189
190impl ExecutionPayload for BigBlockData<ExecutionData> {
191    fn parent_hash(&self) -> B256 {
192        self.env_switches[0].parent_hash()
193    }
194
195    fn block_hash(&self) -> B256 {
196        self.env_switches.last().unwrap().block_hash()
197    }
198
199    fn block_number(&self) -> u64 {
200        self.block_number
201    }
202
203    fn withdrawals(&self) -> Option<&Vec<Withdrawal>> {
204        self.env_switches[0].withdrawals()
205    }
206
207    fn block_access_list(&self) -> Option<&Bytes> {
208        self.merged_block_access_list.as_ref()
209    }
210
211    fn parent_beacon_block_root(&self) -> Option<B256> {
212        self.env_switches[0].parent_beacon_block_root()
213    }
214
215    fn timestamp(&self) -> u64 {
216        self.env_switches[0].timestamp()
217    }
218
219    fn gas_used(&self) -> u64 {
220        self.env_switches.iter().map(|data| data.gas_used()).sum()
221    }
222
223    fn gas_limit(&self) -> u64 {
224        self.env_switches.iter().map(|data| data.gas_limit()).sum()
225    }
226
227    fn transaction_count(&self) -> usize {
228        self.env_switches.iter().map(|data| data.transaction_count()).sum()
229    }
230
231    fn slot_number(&self) -> Option<u64> {
232        self.env_switches[0].payload.slot_number()
233    }
234}
235
236/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
237/// consensus layer).
238#[derive(Debug)]
239pub enum BeaconEngineMessage<Payload: PayloadTypes> {
240    /// Message with new payload.
241    NewPayload {
242        /// The execution payload received by Engine API.
243        payload: Payload::ExecutionData,
244        /// The sender for returning payload status result.
245        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
246    },
247    /// Message with new payload used by `reth_newPayload` endpoint.
248    ///
249    /// Supports independent control over waiting for persistence and cache locks before
250    /// processing, providing unbiased timing measurements when enabled.
251    ///
252    /// Returns detailed timing breakdown alongside the payload status.
253    RethNewPayload {
254        /// The execution payload received by Engine API.
255        payload: Payload::ExecutionData,
256        /// Whether to wait for in-flight persistence to complete before processing.
257        wait_for_persistence: bool,
258        /// Whether to wait for execution cache and sparse trie locks before processing.
259        wait_for_caches: bool,
260        /// The sender for returning payload status result and timing breakdown.
261        tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
262        /// When this message was enqueued, used to measure backpressure wait time.
263        enqueued_at: Instant,
264    },
265    /// Message with updated forkchoice state.
266    ForkchoiceUpdated {
267        /// The updated forkchoice state.
268        state: ForkchoiceState,
269        /// The payload attributes for block building.
270        payload_attrs: Option<Payload::PayloadAttributes>,
271        /// The sender for returning forkchoice updated result.
272        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
273    },
274}
275
276impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278        match self {
279            Self::NewPayload { payload, .. } => {
280                write!(
281                    f,
282                    "NewPayload(parent: {}, number: {}, hash: {})",
283                    payload.parent_hash(),
284                    payload.block_number(),
285                    payload.block_hash()
286                )
287            }
288            Self::RethNewPayload { payload, .. } => {
289                write!(
290                    f,
291                    "RethNewPayload(parent: {}, number: {}, hash: {})",
292                    payload.parent_hash(),
293                    payload.block_number(),
294                    payload.block_hash()
295                )
296            }
297            Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
298                // we don't want to print the entire payload attributes, because for OP this
299                // includes all txs
300                write!(
301                    f,
302                    "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
303                    payload_attrs.is_some()
304                )
305            }
306        }
307    }
308}
309
310/// A cloneable sender type that can be used to send engine API messages.
311///
312/// This type mirrors consensus related functions of the engine API.
313#[derive(Debug, Clone)]
314pub struct ConsensusEngineHandle<Payload>
315where
316    Payload: PayloadTypes,
317{
318    to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
319}
320
321impl<Payload> ConsensusEngineHandle<Payload>
322where
323    Payload: PayloadTypes,
324{
325    /// Creates a new beacon consensus engine handle.
326    pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
327        Self { to_engine }
328    }
329
330    /// Sends a new payload message to the beacon consensus engine and waits for a response.
331    ///
332    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
333    pub async fn new_payload(
334        &self,
335        payload: Payload::ExecutionData,
336    ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
337        let (tx, rx) = oneshot::channel();
338        let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
339        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
340    }
341
342    /// Sends a new payload message used by `reth_newPayload` endpoint.
343    ///
344    /// `wait_for_persistence`: waits for in-flight persistence to complete.
345    /// `wait_for_caches`: waits for execution cache and sparse trie locks.
346    ///
347    /// Returns detailed timing breakdown alongside the payload status.
348    pub async fn reth_new_payload(
349        &self,
350        payload: Payload::ExecutionData,
351        wait_for_persistence: bool,
352        wait_for_caches: bool,
353    ) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
354        let (tx, rx) = oneshot::channel();
355        let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload {
356            payload,
357            wait_for_persistence,
358            wait_for_caches,
359            tx,
360            enqueued_at: Instant::now(),
361        });
362        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
363    }
364
365    /// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
366    ///
367    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
368    pub async fn fork_choice_updated(
369        &self,
370        state: ForkchoiceState,
371        payload_attrs: Option<Payload::PayloadAttributes>,
372    ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
373        Ok(self
374            .send_fork_choice_updated(state, payload_attrs)
375            .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
376            .await?
377            .map_err(BeaconForkChoiceUpdateError::internal)?
378            .await?)
379    }
380
381    /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
382    /// wait for a response.
383    fn send_fork_choice_updated(
384        &self,
385        state: ForkchoiceState,
386        payload_attrs: Option<Payload::PayloadAttributes>,
387    ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
388        let (tx, rx) = oneshot::channel();
389        let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
390            state,
391            payload_attrs,
392            tx,
393        });
394        rx
395    }
396}