1use crate::{
2 error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_rpc_types_engine::{
5 ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
6 PayloadStatus, PayloadStatusEnum,
7};
8use core::{
9 fmt::{self, Display},
10 future::Future,
11 pin::Pin,
12 task::{ready, Context, Poll},
13};
14use futures::{future::Either, FutureExt, TryFutureExt};
15use reth_errors::RethResult;
16use reth_payload_builder_primitives::PayloadBuilderError;
17use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes};
18use tokio::sync::{mpsc::UnboundedSender, oneshot};
19
20#[deprecated(note = "Use ConsensusEngineHandle instead")]
22pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
23
24#[must_use = "futures do nothing unless you `.await` or poll them"]
28#[derive(Debug)]
29pub struct OnForkChoiceUpdated {
30 forkchoice_status: ForkchoiceStatus,
35 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
37}
38
39impl OnForkChoiceUpdated {
42 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
44 self.forkchoice_status
45 }
46
47 pub fn syncing() -> Self {
49 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
50 Self {
51 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
52 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
53 }
54 }
55
56 pub fn valid(status: PayloadStatus) -> Self {
59 Self {
60 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
61 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
62 }
63 }
64
65 pub fn with_invalid(status: PayloadStatus) -> Self {
68 Self {
69 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
70 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
71 }
72 }
73
74 pub fn invalid_state() -> Self {
77 Self {
78 forkchoice_status: ForkchoiceStatus::Invalid,
79 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
80 }
81 }
82
83 pub fn invalid_payload_attributes() -> Self {
86 Self {
87 forkchoice_status: ForkchoiceStatus::Valid,
89 fut: Either::Left(futures::future::ready(Err(
90 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
91 ))),
92 }
93 }
94
95 pub const fn updated_with_pending_payload_id(
97 payload_status: PayloadStatus,
98 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
99 ) -> Self {
100 Self {
101 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
102 fut: Either::Right(PendingPayloadId {
103 payload_status: Some(payload_status),
104 pending_payload_id,
105 }),
106 }
107 }
108}
109
110impl Future for OnForkChoiceUpdated {
111 type Output = ForkChoiceUpdateResult;
112
113 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
114 self.get_mut().fut.poll_unpin(cx)
115 }
116}
117
118#[derive(Debug)]
121struct PendingPayloadId {
122 payload_status: Option<PayloadStatus>,
123 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
124}
125
126impl Future for PendingPayloadId {
127 type Output = ForkChoiceUpdateResult;
128
129 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 let this = self.get_mut();
131 let res = ready!(this.pending_payload_id.poll_unpin(cx));
132 match res {
133 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
134 payload_status: this.payload_status.take().expect("Polled after completion"),
135 payload_id: Some(payload_id),
136 })),
137 Err(_) | Ok(Err(_)) => {
138 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
140 }
141 }
142 }
143}
144
145#[derive(Debug)]
148pub enum BeaconEngineMessage<Payload: PayloadTypes> {
149 NewPayload {
151 payload: Payload::ExecutionData,
153 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
155 },
156 ForkchoiceUpdated {
158 state: ForkchoiceState,
160 payload_attrs: Option<Payload::PayloadAttributes>,
162 version: EngineApiMessageVersion,
164 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
166 },
167}
168
169impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 match self {
172 Self::NewPayload { payload, .. } => {
173 write!(
174 f,
175 "NewPayload(parent: {}, number: {}, hash: {})",
176 payload.parent_hash(),
177 payload.block_number(),
178 payload.block_hash()
179 )
180 }
181 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
182 write!(
185 f,
186 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
187 payload_attrs.is_some()
188 )
189 }
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
198pub struct ConsensusEngineHandle<Payload>
199where
200 Payload: PayloadTypes,
201{
202 to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
203}
204
205impl<Payload> ConsensusEngineHandle<Payload>
206where
207 Payload: PayloadTypes,
208{
209 pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
211 Self { to_engine }
212 }
213
214 pub async fn new_payload(
218 &self,
219 payload: Payload::ExecutionData,
220 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
221 let (tx, rx) = oneshot::channel();
222 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
223 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
224 }
225
226 pub async fn fork_choice_updated(
230 &self,
231 state: ForkchoiceState,
232 payload_attrs: Option<Payload::PayloadAttributes>,
233 version: EngineApiMessageVersion,
234 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
235 Ok(self
236 .send_fork_choice_updated(state, payload_attrs, version)
237 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
238 .await?
239 .map_err(BeaconForkChoiceUpdateError::internal)?
240 .await?)
241 }
242
243 fn send_fork_choice_updated(
246 &self,
247 state: ForkchoiceState,
248 payload_attrs: Option<Payload::PayloadAttributes>,
249 version: EngineApiMessageVersion,
250 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
251 let (tx, rx) = oneshot::channel();
252 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
253 state,
254 payload_attrs,
255 tx,
256 version,
257 });
258 rx
259 }
260}