Skip to main content

reth_engine_primitives/
message.rs

1use crate::{
2    error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_rpc_types_engine::{
5    ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
6    PayloadStatus, PayloadStatusEnum,
7};
8use core::{
9    fmt::{self, Display},
10    future::Future,
11    pin::Pin,
12    task::{ready, Context, Poll},
13};
14use futures::{future::Either, FutureExt, TryFutureExt};
15use reth_errors::RethResult;
16use reth_payload_builder_primitives::PayloadBuilderError;
17use reth_payload_primitives::PayloadTypes;
18use std::time::{Duration, Instant};
19use tokio::sync::{mpsc::UnboundedSender, oneshot};
20
21/// Type alias for backwards compat
22#[deprecated(note = "Use ConsensusEngineHandle instead")]
23pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
24
25/// Represents the outcome of forkchoice update.
26///
27/// This is a future that resolves to [`ForkChoiceUpdateResult`]
28#[must_use = "futures do nothing unless you `.await` or poll them"]
29#[derive(Debug)]
30pub struct OnForkChoiceUpdated {
31    /// Represents the status of the forkchoice update.
32    ///
33    /// Note: This is separate from the response `fut`, because we still can return an error
34    /// depending on the payload attributes, even if the forkchoice update itself is valid.
35    forkchoice_status: ForkchoiceStatus,
36    /// Returns the result of the forkchoice update.
37    fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
38}
39
40// === impl OnForkChoiceUpdated ===
41
42impl OnForkChoiceUpdated {
43    /// Returns the determined status of the received `ForkchoiceState`.
44    pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
45        self.forkchoice_status
46    }
47
48    /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
49    pub fn syncing() -> Self {
50        let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
51        Self {
52            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
53            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
54        }
55    }
56
57    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
58    /// payload attributes were provided.
59    pub fn valid(status: PayloadStatus) -> Self {
60        Self {
61            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
62            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
63        }
64    }
65
66    /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
67    /// forkchoice update failed due to an invalid payload.
68    pub fn with_invalid(status: PayloadStatus) -> Self {
69        Self {
70            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
71            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
72        }
73    }
74
75    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
76    /// given state is considered invalid
77    pub fn invalid_state() -> Self {
78        Self {
79            forkchoice_status: ForkchoiceStatus::Invalid,
80            fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
81        }
82    }
83
84    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
85    /// payload attributes were invalid.
86    pub fn invalid_payload_attributes() -> Self {
87        Self {
88            // This is valid because this is only reachable if the state and payload is valid
89            forkchoice_status: ForkchoiceStatus::Valid,
90            fut: Either::Left(futures::future::ready(Err(
91                ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
92            ))),
93        }
94    }
95
96    /// If the forkchoice update was successful and no payload attributes were provided, this method
97    pub const fn updated_with_pending_payload_id(
98        payload_status: PayloadStatus,
99        pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
100    ) -> Self {
101        Self {
102            forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
103            fut: Either::Right(PendingPayloadId {
104                payload_status: Some(payload_status),
105                pending_payload_id,
106            }),
107        }
108    }
109}
110
111impl Future for OnForkChoiceUpdated {
112    type Output = ForkChoiceUpdateResult;
113
114    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115        self.get_mut().fut.poll_unpin(cx)
116    }
117}
118
119/// A future that returns the payload id of a yet to be initiated payload job after a successful
120/// forkchoice update
121#[derive(Debug)]
122struct PendingPayloadId {
123    payload_status: Option<PayloadStatus>,
124    pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
125}
126
127impl Future for PendingPayloadId {
128    type Output = ForkChoiceUpdateResult;
129
130    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131        let this = self.get_mut();
132        let res = ready!(this.pending_payload_id.poll_unpin(cx));
133        match res {
134            Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
135                payload_status: this.payload_status.take().expect("Polled after completion"),
136                payload_id: Some(payload_id),
137            })),
138            Err(_) | Ok(Err(_)) => {
139                // failed to initiate a payload build job
140                Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
141            }
142        }
143    }
144}
145
146/// Timing breakdown for `reth_newPayload` responses.
147#[derive(Debug, Clone, Copy)]
148pub struct NewPayloadTimings {
149    /// Server-side execution latency.
150    pub latency: Duration,
151    /// Time spent waiting on persistence, including both time this message spent queued
152    /// due to persistence backpressure and, when `wait_for_persistence` was requested,
153    /// the explicit wait for in-flight persistence to complete.
154    pub persistence_wait: Duration,
155    /// Time spent waiting for the execution cache lock.
156    ///
157    /// `None` when wasn't asked to wait for execution cache.
158    pub execution_cache_wait: Option<Duration>,
159    /// Time spent waiting for the sparse trie cache lock.
160    ///
161    /// `None` when wasn't asked to wait for sparse trie cache.
162    pub sparse_trie_wait: Option<Duration>,
163}
164
165/// Additional data for big block payloads that merge multiple real blocks.
166///
167/// This is used by the `reth_newPayload` endpoint to pass environment switches
168/// and prior block hashes needed for correct multi-segment execution.
169#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
170pub struct BigBlockData<ExecutionData> {
171    /// Environment switches at block boundaries.
172    /// Each entry is `(cumulative_tx_count, execution_data_of_next_block)`.
173    ///
174    /// The first entry at index 0 represents the **original unmutated** base block's
175    /// `ExecutionData`, which must be used to derive the initial EVM environment.
176    pub env_switches: Vec<(usize, ExecutionData)>,
177    /// Block number → real block hash for blocks covered by previous big blocks in a sequence.
178    /// When replaying chained big blocks, the BLOCKHASH opcode needs real hashes for blocks
179    /// that were merged into earlier big blocks (and thus not individually persisted).
180    pub prior_block_hashes: Vec<(u64, alloy_primitives::B256)>,
181}
182
183impl<T> Default for BigBlockData<T> {
184    fn default() -> Self {
185        Self { env_switches: Vec::new(), prior_block_hashes: Vec::new() }
186    }
187}
188
189/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
190/// consensus layer).
191#[derive(Debug)]
192pub enum BeaconEngineMessage<Payload: PayloadTypes> {
193    /// Message with new payload.
194    NewPayload {
195        /// The execution payload received by Engine API.
196        payload: Payload::ExecutionData,
197        /// The sender for returning payload status result.
198        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
199        /// When this message was enqueued, used to measure backpressure wait time.
200        enqueued_at: Instant,
201    },
202    /// Message with new payload used by `reth_newPayload` endpoint.
203    ///
204    /// Supports independent control over waiting for persistence and cache locks before
205    /// processing, providing unbiased timing measurements when enabled.
206    ///
207    /// Returns detailed timing breakdown alongside the payload status.
208    RethNewPayload {
209        /// The execution payload received by Engine API.
210        payload: Payload::ExecutionData,
211        /// Whether to wait for in-flight persistence to complete before processing.
212        wait_for_persistence: bool,
213        /// Whether to wait for execution cache and sparse trie locks before processing.
214        wait_for_caches: bool,
215        /// The sender for returning payload status result and timing breakdown.
216        tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
217        /// When this message was enqueued, used to measure backpressure wait time.
218        enqueued_at: Instant,
219    },
220    /// Message with updated forkchoice state.
221    ForkchoiceUpdated {
222        /// The updated forkchoice state.
223        state: ForkchoiceState,
224        /// The payload attributes for block building.
225        payload_attrs: Option<Payload::PayloadAttributes>,
226        /// The sender for returning forkchoice updated result.
227        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
228    },
229}
230
231impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        match self {
234            Self::NewPayload { payload, .. } => {
235                write!(
236                    f,
237                    "NewPayload(parent: {}, number: {}, hash: {})",
238                    payload.parent_hash(),
239                    payload.block_number(),
240                    payload.block_hash()
241                )
242            }
243            Self::RethNewPayload { payload, .. } => {
244                write!(
245                    f,
246                    "RethNewPayload(parent: {}, number: {}, hash: {})",
247                    payload.parent_hash(),
248                    payload.block_number(),
249                    payload.block_hash()
250                )
251            }
252            Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
253                // we don't want to print the entire payload attributes, because for OP this
254                // includes all txs
255                write!(
256                    f,
257                    "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
258                    payload_attrs.is_some()
259                )
260            }
261        }
262    }
263}
264
265/// A cloneable sender type that can be used to send engine API messages.
266///
267/// This type mirrors consensus related functions of the engine API.
268#[derive(Debug, Clone)]
269pub struct ConsensusEngineHandle<Payload>
270where
271    Payload: PayloadTypes,
272{
273    to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
274}
275
276impl<Payload> ConsensusEngineHandle<Payload>
277where
278    Payload: PayloadTypes,
279{
280    /// Creates a new beacon consensus engine handle.
281    pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
282        Self { to_engine }
283    }
284
285    /// Sends a new payload message to the beacon consensus engine and waits for a response.
286    ///
287    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
288    pub async fn new_payload(
289        &self,
290        payload: Payload::ExecutionData,
291    ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
292        let (tx, rx) = oneshot::channel();
293        let _ = self.to_engine.send(BeaconEngineMessage::NewPayload {
294            payload,
295            tx,
296            enqueued_at: Instant::now(),
297        });
298        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
299    }
300
301    /// Sends a new payload message used by `reth_newPayload` endpoint.
302    ///
303    /// `wait_for_persistence`: waits for in-flight persistence to complete.
304    /// `wait_for_caches`: waits for execution cache and sparse trie locks.
305    ///
306    /// Returns detailed timing breakdown alongside the payload status.
307    pub async fn reth_new_payload(
308        &self,
309        payload: Payload::ExecutionData,
310        wait_for_persistence: bool,
311        wait_for_caches: bool,
312    ) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
313        let (tx, rx) = oneshot::channel();
314        let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload {
315            payload,
316            wait_for_persistence,
317            wait_for_caches,
318            tx,
319            enqueued_at: Instant::now(),
320        });
321        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
322    }
323
324    /// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
325    ///
326    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
327    pub async fn fork_choice_updated(
328        &self,
329        state: ForkchoiceState,
330        payload_attrs: Option<Payload::PayloadAttributes>,
331    ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
332        Ok(self
333            .send_fork_choice_updated(state, payload_attrs)
334            .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
335            .await?
336            .map_err(BeaconForkChoiceUpdateError::internal)?
337            .await?)
338    }
339
340    /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
341    /// wait for a response.
342    fn send_fork_choice_updated(
343        &self,
344        state: ForkchoiceState,
345        payload_attrs: Option<Payload::PayloadAttributes>,
346    ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
347        let (tx, rx) = oneshot::channel();
348        let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
349            state,
350            payload_attrs,
351            tx,
352        });
353        rx
354    }
355}