reth_engine_primitives/
message.rs

1use crate::{
2    error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_rpc_types_engine::{
5    ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
6    PayloadStatus, PayloadStatusEnum,
7};
8use core::{
9    fmt::{self, Display},
10    future::Future,
11    pin::Pin,
12    task::{ready, Context, Poll},
13};
14use futures::{future::Either, FutureExt, TryFutureExt};
15use reth_errors::RethResult;
16use reth_payload_builder_primitives::PayloadBuilderError;
17use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes};
18use tokio::sync::{mpsc::UnboundedSender, oneshot};
19
20/// Type alias for backwards compat
21#[deprecated(note = "Use ConsensusEngineHandle instead")]
22pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
23
24/// Represents the outcome of forkchoice update.
25///
26/// This is a future that resolves to [`ForkChoiceUpdateResult`]
27#[must_use = "futures do nothing unless you `.await` or poll them"]
28#[derive(Debug)]
29pub struct OnForkChoiceUpdated {
30    /// Represents the status of the forkchoice update.
31    ///
32    /// Note: This is separate from the response `fut`, because we still can return an error
33    /// depending on the payload attributes, even if the forkchoice update itself is valid.
34    forkchoice_status: ForkchoiceStatus,
35    /// Returns the result of the forkchoice update.
36    fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
37}
38
39// === impl OnForkChoiceUpdated ===
40
41impl OnForkChoiceUpdated {
42    /// Returns the determined status of the received `ForkchoiceState`.
43    pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
44        self.forkchoice_status
45    }
46
47    /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
48    pub fn syncing() -> Self {
49        let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
50        Self {
51            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
52            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
53        }
54    }
55
56    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
57    /// payload attributes were provided.
58    pub fn valid(status: PayloadStatus) -> Self {
59        Self {
60            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
61            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
62        }
63    }
64
65    /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
66    /// forkchoice update failed due to an invalid payload.
67    pub fn with_invalid(status: PayloadStatus) -> Self {
68        Self {
69            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
70            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
71        }
72    }
73
74    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
75    /// given state is considered invalid
76    pub fn invalid_state() -> Self {
77        Self {
78            forkchoice_status: ForkchoiceStatus::Invalid,
79            fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
80        }
81    }
82
83    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
84    /// payload attributes were invalid.
85    pub fn invalid_payload_attributes() -> Self {
86        Self {
87            // This is valid because this is only reachable if the state and payload is valid
88            forkchoice_status: ForkchoiceStatus::Valid,
89            fut: Either::Left(futures::future::ready(Err(
90                ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
91            ))),
92        }
93    }
94
95    /// If the forkchoice update was successful and no payload attributes were provided, this method
96    pub const fn updated_with_pending_payload_id(
97        payload_status: PayloadStatus,
98        pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
99    ) -> Self {
100        Self {
101            forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
102            fut: Either::Right(PendingPayloadId {
103                payload_status: Some(payload_status),
104                pending_payload_id,
105            }),
106        }
107    }
108}
109
110impl Future for OnForkChoiceUpdated {
111    type Output = ForkChoiceUpdateResult;
112
113    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114        self.get_mut().fut.poll_unpin(cx)
115    }
116}
117
118/// A future that returns the payload id of a yet to be initiated payload job after a successful
119/// forkchoice update
120#[derive(Debug)]
121struct PendingPayloadId {
122    payload_status: Option<PayloadStatus>,
123    pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
124}
125
126impl Future for PendingPayloadId {
127    type Output = ForkChoiceUpdateResult;
128
129    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        let this = self.get_mut();
131        let res = ready!(this.pending_payload_id.poll_unpin(cx));
132        match res {
133            Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
134                payload_status: this.payload_status.take().expect("Polled after completion"),
135                payload_id: Some(payload_id),
136            })),
137            Err(_) | Ok(Err(_)) => {
138                // failed to initiate a payload build job
139                Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
140            }
141        }
142    }
143}
144
145/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
146/// consensus layer).
147#[derive(Debug)]
148pub enum BeaconEngineMessage<Payload: PayloadTypes> {
149    /// Message with new payload.
150    NewPayload {
151        /// The execution payload received by Engine API.
152        payload: Payload::ExecutionData,
153        /// The sender for returning payload status result.
154        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
155    },
156    /// Message with updated forkchoice state.
157    ForkchoiceUpdated {
158        /// The updated forkchoice state.
159        state: ForkchoiceState,
160        /// The payload attributes for block building.
161        payload_attrs: Option<Payload::PayloadAttributes>,
162        /// The Engine API Version.
163        version: EngineApiMessageVersion,
164        /// The sender for returning forkchoice updated result.
165        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
166    },
167}
168
169impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171        match self {
172            Self::NewPayload { payload, .. } => {
173                write!(
174                    f,
175                    "NewPayload(parent: {}, number: {}, hash: {})",
176                    payload.parent_hash(),
177                    payload.block_number(),
178                    payload.block_hash()
179                )
180            }
181            Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
182                // we don't want to print the entire payload attributes, because for OP this
183                // includes all txs
184                write!(
185                    f,
186                    "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
187                    payload_attrs.is_some()
188                )
189            }
190        }
191    }
192}
193
194/// A cloneable sender type that can be used to send engine API messages.
195///
196/// This type mirrors consensus related functions of the engine API.
197#[derive(Debug, Clone)]
198pub struct ConsensusEngineHandle<Payload>
199where
200    Payload: PayloadTypes,
201{
202    to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
203}
204
205impl<Payload> ConsensusEngineHandle<Payload>
206where
207    Payload: PayloadTypes,
208{
209    /// Creates a new beacon consensus engine handle.
210    pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
211        Self { to_engine }
212    }
213
214    /// Sends a new payload message to the beacon consensus engine and waits for a response.
215    ///
216    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
217    pub async fn new_payload(
218        &self,
219        payload: Payload::ExecutionData,
220    ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
221        let (tx, rx) = oneshot::channel();
222        let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
223        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
224    }
225
226    /// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
227    ///
228    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
229    pub async fn fork_choice_updated(
230        &self,
231        state: ForkchoiceState,
232        payload_attrs: Option<Payload::PayloadAttributes>,
233        version: EngineApiMessageVersion,
234    ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
235        Ok(self
236            .send_fork_choice_updated(state, payload_attrs, version)
237            .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
238            .await?
239            .map_err(BeaconForkChoiceUpdateError::internal)?
240            .await?)
241    }
242
243    /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
244    /// wait for a response.
245    fn send_fork_choice_updated(
246        &self,
247        state: ForkchoiceState,
248        payload_attrs: Option<Payload::PayloadAttributes>,
249        version: EngineApiMessageVersion,
250    ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
251        let (tx, rx) = oneshot::channel();
252        let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
253            state,
254            payload_attrs,
255            tx,
256            version,
257        });
258        rx
259    }
260}