reth_engine_primitives/
message.rs

1use crate::{
2    error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, EngineApiMessageVersion,
3    EngineTypes, ExecutionPayload, ForkchoiceStatus,
4};
5use alloy_rpc_types_engine::{
6    ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
7    PayloadStatus, PayloadStatusEnum,
8};
9use core::{
10    fmt::{self, Display},
11    future::Future,
12    pin::Pin,
13    task::{ready, Context, Poll},
14};
15use futures::{future::Either, FutureExt, TryFutureExt};
16use reth_errors::RethResult;
17use reth_payload_builder_primitives::PayloadBuilderError;
18use tokio::sync::{mpsc::UnboundedSender, oneshot};
19
20/// Represents the outcome of forkchoice update.
21///
22/// This is a future that resolves to [`ForkChoiceUpdateResult`]
23#[must_use = "futures do nothing unless you `.await` or poll them"]
24#[derive(Debug)]
25pub struct OnForkChoiceUpdated {
26    /// Represents the status of the forkchoice update.
27    ///
28    /// Note: This is separate from the response `fut`, because we still can return an error
29    /// depending on the payload attributes, even if the forkchoice update itself is valid.
30    forkchoice_status: ForkchoiceStatus,
31    /// Returns the result of the forkchoice update.
32    fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
33}
34
35// === impl OnForkChoiceUpdated ===
36
37impl OnForkChoiceUpdated {
38    /// Returns the determined status of the received `ForkchoiceState`.
39    pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
40        self.forkchoice_status
41    }
42
43    /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
44    pub fn syncing() -> Self {
45        let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
46        Self {
47            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
48            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
49        }
50    }
51
52    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
53    /// payload attributes were provided.
54    pub fn valid(status: PayloadStatus) -> Self {
55        Self {
56            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
57            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
58        }
59    }
60
61    /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
62    /// forkchoice update failed due to an invalid payload.
63    pub fn with_invalid(status: PayloadStatus) -> Self {
64        Self {
65            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
66            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
67        }
68    }
69
70    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
71    /// given state is considered invalid
72    pub fn invalid_state() -> Self {
73        Self {
74            forkchoice_status: ForkchoiceStatus::Invalid,
75            fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
76        }
77    }
78
79    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
80    /// payload attributes were invalid.
81    pub fn invalid_payload_attributes() -> Self {
82        Self {
83            // This is valid because this is only reachable if the state and payload is valid
84            forkchoice_status: ForkchoiceStatus::Valid,
85            fut: Either::Left(futures::future::ready(Err(
86                ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
87            ))),
88        }
89    }
90
91    /// If the forkchoice update was successful and no payload attributes were provided, this method
92    pub const fn updated_with_pending_payload_id(
93        payload_status: PayloadStatus,
94        pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
95    ) -> Self {
96        Self {
97            forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
98            fut: Either::Right(PendingPayloadId {
99                payload_status: Some(payload_status),
100                pending_payload_id,
101            }),
102        }
103    }
104}
105
106impl Future for OnForkChoiceUpdated {
107    type Output = ForkChoiceUpdateResult;
108
109    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110        self.get_mut().fut.poll_unpin(cx)
111    }
112}
113
114/// A future that returns the payload id of a yet to be initiated payload job after a successful
115/// forkchoice update
116#[derive(Debug)]
117struct PendingPayloadId {
118    payload_status: Option<PayloadStatus>,
119    pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
120}
121
122impl Future for PendingPayloadId {
123    type Output = ForkChoiceUpdateResult;
124
125    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126        let this = self.get_mut();
127        let res = ready!(this.pending_payload_id.poll_unpin(cx));
128        match res {
129            Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
130                payload_status: this.payload_status.take().expect("Polled after completion"),
131                payload_id: Some(payload_id),
132            })),
133            Err(_) | Ok(Err(_)) => {
134                // failed to initiate a payload build job
135                Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
136            }
137        }
138    }
139}
140
141/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
142/// consensus layer).
143#[derive(Debug)]
144pub enum BeaconEngineMessage<Engine: EngineTypes> {
145    /// Message with new payload.
146    NewPayload {
147        /// The execution payload received by Engine API.
148        payload: Engine::ExecutionData,
149        /// The sender for returning payload status result.
150        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
151    },
152    /// Message with updated forkchoice state.
153    ForkchoiceUpdated {
154        /// The updated forkchoice state.
155        state: ForkchoiceState,
156        /// The payload attributes for block building.
157        payload_attrs: Option<Engine::PayloadAttributes>,
158        /// The Engine API Version.
159        version: EngineApiMessageVersion,
160        /// The sender for returning forkchoice updated result.
161        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
162    },
163    /// Message with exchanged transition configuration.
164    TransitionConfigurationExchanged,
165}
166
167impl<Engine: EngineTypes> Display for BeaconEngineMessage<Engine> {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        match self {
170            Self::NewPayload { payload, .. } => {
171                write!(
172                    f,
173                    "NewPayload(parent: {}, number: {}, hash: {})",
174                    payload.parent_hash(),
175                    payload.block_number(),
176                    payload.block_hash()
177                )
178            }
179            Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
180                // we don't want to print the entire payload attributes, because for OP this
181                // includes all txs
182                write!(
183                    f,
184                    "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
185                    payload_attrs.is_some()
186                )
187            }
188            Self::TransitionConfigurationExchanged => {
189                write!(f, "TransitionConfigurationExchanged")
190            }
191        }
192    }
193}
194
195/// A clonable sender type that can be used to send engine API messages.
196///
197/// This type mirrors consensus related functions of the engine API.
198#[derive(Debug, Clone)]
199pub struct BeaconConsensusEngineHandle<Engine>
200where
201    Engine: EngineTypes,
202{
203    to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
204}
205
206impl<Engine> BeaconConsensusEngineHandle<Engine>
207where
208    Engine: EngineTypes,
209{
210    /// Creates a new beacon consensus engine handle.
211    pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
212        Self { to_engine }
213    }
214
215    /// Sends a new payload message to the beacon consensus engine and waits for a response.
216    ///
217    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
218    pub async fn new_payload(
219        &self,
220        payload: Engine::ExecutionData,
221    ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
222        let (tx, rx) = oneshot::channel();
223        let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
224        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
225    }
226
227    /// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
228    ///
229    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
230    pub async fn fork_choice_updated(
231        &self,
232        state: ForkchoiceState,
233        payload_attrs: Option<Engine::PayloadAttributes>,
234        version: EngineApiMessageVersion,
235    ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
236        Ok(self
237            .send_fork_choice_updated(state, payload_attrs, version)
238            .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
239            .await?
240            .map_err(BeaconForkChoiceUpdateError::internal)?
241            .await?)
242    }
243
244    /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
245    /// wait for a response.
246    fn send_fork_choice_updated(
247        &self,
248        state: ForkchoiceState,
249        payload_attrs: Option<Engine::PayloadAttributes>,
250        version: EngineApiMessageVersion,
251    ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
252        let (tx, rx) = oneshot::channel();
253        let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
254            state,
255            payload_attrs,
256            tx,
257            version,
258        });
259        rx
260    }
261
262    /// Sends a transition configuration exchange message to the beacon consensus engine.
263    ///
264    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
265    ///
266    /// This only notifies about the exchange. The actual exchange is done by the engine API impl
267    /// itself.
268    pub fn transition_configuration_exchanged(&self) {
269        let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
270    }
271}