1use crate::{
2 error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, EngineApiMessageVersion,
3 ExecutionPayload, ForkchoiceStatus,
4};
5use alloy_rpc_types_engine::{
6 ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
7 PayloadStatus, PayloadStatusEnum,
8};
9use core::{
10 fmt::{self, Display},
11 future::Future,
12 pin::Pin,
13 task::{ready, Context, Poll},
14};
15use futures::{future::Either, FutureExt, TryFutureExt};
16use reth_errors::RethResult;
17use reth_payload_builder_primitives::PayloadBuilderError;
18use reth_payload_primitives::PayloadTypes;
19use tokio::sync::{mpsc::UnboundedSender, oneshot};
20
21#[must_use = "futures do nothing unless you `.await` or poll them"]
25#[derive(Debug)]
26pub struct OnForkChoiceUpdated {
27 forkchoice_status: ForkchoiceStatus,
32 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
34}
35
36impl OnForkChoiceUpdated {
39 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
41 self.forkchoice_status
42 }
43
44 pub fn syncing() -> Self {
46 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
47 Self {
48 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
49 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
50 }
51 }
52
53 pub fn valid(status: PayloadStatus) -> Self {
56 Self {
57 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
58 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
59 }
60 }
61
62 pub fn with_invalid(status: PayloadStatus) -> Self {
65 Self {
66 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
67 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
68 }
69 }
70
71 pub fn invalid_state() -> Self {
74 Self {
75 forkchoice_status: ForkchoiceStatus::Invalid,
76 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
77 }
78 }
79
80 pub fn invalid_payload_attributes() -> Self {
83 Self {
84 forkchoice_status: ForkchoiceStatus::Valid,
86 fut: Either::Left(futures::future::ready(Err(
87 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
88 ))),
89 }
90 }
91
92 pub const fn updated_with_pending_payload_id(
94 payload_status: PayloadStatus,
95 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
96 ) -> Self {
97 Self {
98 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
99 fut: Either::Right(PendingPayloadId {
100 payload_status: Some(payload_status),
101 pending_payload_id,
102 }),
103 }
104 }
105}
106
107impl Future for OnForkChoiceUpdated {
108 type Output = ForkChoiceUpdateResult;
109
110 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111 self.get_mut().fut.poll_unpin(cx)
112 }
113}
114
115#[derive(Debug)]
118struct PendingPayloadId {
119 payload_status: Option<PayloadStatus>,
120 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
121}
122
123impl Future for PendingPayloadId {
124 type Output = ForkChoiceUpdateResult;
125
126 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
127 let this = self.get_mut();
128 let res = ready!(this.pending_payload_id.poll_unpin(cx));
129 match res {
130 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
131 payload_status: this.payload_status.take().expect("Polled after completion"),
132 payload_id: Some(payload_id),
133 })),
134 Err(_) | Ok(Err(_)) => {
135 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
137 }
138 }
139 }
140}
141
142#[derive(Debug)]
145pub enum BeaconEngineMessage<Payload: PayloadTypes> {
146 NewPayload {
148 payload: Payload::ExecutionData,
150 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
152 },
153 ForkchoiceUpdated {
155 state: ForkchoiceState,
157 payload_attrs: Option<Payload::PayloadAttributes>,
159 version: EngineApiMessageVersion,
161 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
163 },
164}
165
166impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168 match self {
169 Self::NewPayload { payload, .. } => {
170 write!(
171 f,
172 "NewPayload(parent: {}, number: {}, hash: {})",
173 payload.parent_hash(),
174 payload.block_number(),
175 payload.block_hash()
176 )
177 }
178 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
179 write!(
182 f,
183 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
184 payload_attrs.is_some()
185 )
186 }
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
195pub struct BeaconConsensusEngineHandle<Payload>
196where
197 Payload: PayloadTypes,
198{
199 to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
200}
201
202impl<Payload> BeaconConsensusEngineHandle<Payload>
203where
204 Payload: PayloadTypes,
205{
206 pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
208 Self { to_engine }
209 }
210
211 pub async fn new_payload(
215 &self,
216 payload: Payload::ExecutionData,
217 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
218 let (tx, rx) = oneshot::channel();
219 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
220 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
221 }
222
223 pub async fn fork_choice_updated(
227 &self,
228 state: ForkchoiceState,
229 payload_attrs: Option<Payload::PayloadAttributes>,
230 version: EngineApiMessageVersion,
231 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
232 Ok(self
233 .send_fork_choice_updated(state, payload_attrs, version)
234 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
235 .await?
236 .map_err(BeaconForkChoiceUpdateError::internal)?
237 .await?)
238 }
239
240 fn send_fork_choice_updated(
243 &self,
244 state: ForkchoiceState,
245 payload_attrs: Option<Payload::PayloadAttributes>,
246 version: EngineApiMessageVersion,
247 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
248 let (tx, rx) = oneshot::channel();
249 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
250 state,
251 payload_attrs,
252 tx,
253 version,
254 });
255 rx
256 }
257}