reth_engine_primitives/
message.rs

1use crate::{
2    error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, EngineApiMessageVersion,
3    ExecutionPayload, ForkchoiceStatus,
4};
5use alloy_rpc_types_engine::{
6    ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
7    PayloadStatus, PayloadStatusEnum,
8};
9use core::{
10    fmt::{self, Display},
11    future::Future,
12    pin::Pin,
13    task::{ready, Context, Poll},
14};
15use futures::{future::Either, FutureExt, TryFutureExt};
16use reth_errors::RethResult;
17use reth_payload_builder_primitives::PayloadBuilderError;
18use reth_payload_primitives::PayloadTypes;
19use tokio::sync::{mpsc::UnboundedSender, oneshot};
20
21/// Represents the outcome of forkchoice update.
22///
23/// This is a future that resolves to [`ForkChoiceUpdateResult`]
24#[must_use = "futures do nothing unless you `.await` or poll them"]
25#[derive(Debug)]
26pub struct OnForkChoiceUpdated {
27    /// Represents the status of the forkchoice update.
28    ///
29    /// Note: This is separate from the response `fut`, because we still can return an error
30    /// depending on the payload attributes, even if the forkchoice update itself is valid.
31    forkchoice_status: ForkchoiceStatus,
32    /// Returns the result of the forkchoice update.
33    fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
34}
35
36// === impl OnForkChoiceUpdated ===
37
38impl OnForkChoiceUpdated {
39    /// Returns the determined status of the received `ForkchoiceState`.
40    pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
41        self.forkchoice_status
42    }
43
44    /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
45    pub fn syncing() -> Self {
46        let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
47        Self {
48            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
49            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
50        }
51    }
52
53    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
54    /// payload attributes were provided.
55    pub fn valid(status: PayloadStatus) -> Self {
56        Self {
57            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
58            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
59        }
60    }
61
62    /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
63    /// forkchoice update failed due to an invalid payload.
64    pub fn with_invalid(status: PayloadStatus) -> Self {
65        Self {
66            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
67            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
68        }
69    }
70
71    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
72    /// given state is considered invalid
73    pub fn invalid_state() -> Self {
74        Self {
75            forkchoice_status: ForkchoiceStatus::Invalid,
76            fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
77        }
78    }
79
80    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
81    /// payload attributes were invalid.
82    pub fn invalid_payload_attributes() -> Self {
83        Self {
84            // This is valid because this is only reachable if the state and payload is valid
85            forkchoice_status: ForkchoiceStatus::Valid,
86            fut: Either::Left(futures::future::ready(Err(
87                ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
88            ))),
89        }
90    }
91
92    /// If the forkchoice update was successful and no payload attributes were provided, this method
93    pub const fn updated_with_pending_payload_id(
94        payload_status: PayloadStatus,
95        pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
96    ) -> Self {
97        Self {
98            forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
99            fut: Either::Right(PendingPayloadId {
100                payload_status: Some(payload_status),
101                pending_payload_id,
102            }),
103        }
104    }
105}
106
107impl Future for OnForkChoiceUpdated {
108    type Output = ForkChoiceUpdateResult;
109
110    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111        self.get_mut().fut.poll_unpin(cx)
112    }
113}
114
115/// A future that returns the payload id of a yet to be initiated payload job after a successful
116/// forkchoice update
117#[derive(Debug)]
118struct PendingPayloadId {
119    payload_status: Option<PayloadStatus>,
120    pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
121}
122
123impl Future for PendingPayloadId {
124    type Output = ForkChoiceUpdateResult;
125
126    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127        let this = self.get_mut();
128        let res = ready!(this.pending_payload_id.poll_unpin(cx));
129        match res {
130            Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
131                payload_status: this.payload_status.take().expect("Polled after completion"),
132                payload_id: Some(payload_id),
133            })),
134            Err(_) | Ok(Err(_)) => {
135                // failed to initiate a payload build job
136                Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
137            }
138        }
139    }
140}
141
142/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
143/// consensus layer).
144#[derive(Debug)]
145pub enum BeaconEngineMessage<Payload: PayloadTypes> {
146    /// Message with new payload.
147    NewPayload {
148        /// The execution payload received by Engine API.
149        payload: Payload::ExecutionData,
150        /// The sender for returning payload status result.
151        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
152    },
153    /// Message with updated forkchoice state.
154    ForkchoiceUpdated {
155        /// The updated forkchoice state.
156        state: ForkchoiceState,
157        /// The payload attributes for block building.
158        payload_attrs: Option<Payload::PayloadAttributes>,
159        /// The Engine API Version.
160        version: EngineApiMessageVersion,
161        /// The sender for returning forkchoice updated result.
162        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
163    },
164}
165
166impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        match self {
169            Self::NewPayload { payload, .. } => {
170                write!(
171                    f,
172                    "NewPayload(parent: {}, number: {}, hash: {})",
173                    payload.parent_hash(),
174                    payload.block_number(),
175                    payload.block_hash()
176                )
177            }
178            Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
179                // we don't want to print the entire payload attributes, because for OP this
180                // includes all txs
181                write!(
182                    f,
183                    "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
184                    payload_attrs.is_some()
185                )
186            }
187        }
188    }
189}
190
191/// A clonable sender type that can be used to send engine API messages.
192///
193/// This type mirrors consensus related functions of the engine API.
194#[derive(Debug, Clone)]
195pub struct BeaconConsensusEngineHandle<Payload>
196where
197    Payload: PayloadTypes,
198{
199    to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
200}
201
202impl<Payload> BeaconConsensusEngineHandle<Payload>
203where
204    Payload: PayloadTypes,
205{
206    /// Creates a new beacon consensus engine handle.
207    pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
208        Self { to_engine }
209    }
210
211    /// Sends a new payload message to the beacon consensus engine and waits for a response.
212    ///
213    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
214    pub async fn new_payload(
215        &self,
216        payload: Payload::ExecutionData,
217    ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
218        let (tx, rx) = oneshot::channel();
219        let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
220        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
221    }
222
223    /// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
224    ///
225    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
226    pub async fn fork_choice_updated(
227        &self,
228        state: ForkchoiceState,
229        payload_attrs: Option<Payload::PayloadAttributes>,
230        version: EngineApiMessageVersion,
231    ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
232        Ok(self
233            .send_fork_choice_updated(state, payload_attrs, version)
234            .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
235            .await?
236            .map_err(BeaconForkChoiceUpdateError::internal)?
237            .await?)
238    }
239
240    /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
241    /// wait for a response.
242    fn send_fork_choice_updated(
243        &self,
244        state: ForkchoiceState,
245        payload_attrs: Option<Payload::PayloadAttributes>,
246        version: EngineApiMessageVersion,
247    ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
248        let (tx, rx) = oneshot::channel();
249        let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
250            state,
251            payload_attrs,
252            tx,
253            version,
254        });
255        rx
256    }
257}