1use crate::{
2 error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, EngineApiMessageVersion,
3 EngineTypes, ExecutionPayload, ForkchoiceStatus,
4};
5use alloy_rpc_types_engine::{
6 ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
7 PayloadStatus, PayloadStatusEnum,
8};
9use core::{
10 fmt::{self, Display},
11 future::Future,
12 pin::Pin,
13 task::{ready, Context, Poll},
14};
15use futures::{future::Either, FutureExt, TryFutureExt};
16use reth_errors::RethResult;
17use reth_payload_builder_primitives::PayloadBuilderError;
18use tokio::sync::{mpsc::UnboundedSender, oneshot};
19
20#[must_use = "futures do nothing unless you `.await` or poll them"]
24#[derive(Debug)]
25pub struct OnForkChoiceUpdated {
26 forkchoice_status: ForkchoiceStatus,
31 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
33}
34
35impl OnForkChoiceUpdated {
38 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
40 self.forkchoice_status
41 }
42
43 pub fn syncing() -> Self {
45 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
46 Self {
47 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
48 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
49 }
50 }
51
52 pub fn valid(status: PayloadStatus) -> Self {
55 Self {
56 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
57 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
58 }
59 }
60
61 pub fn with_invalid(status: PayloadStatus) -> Self {
64 Self {
65 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
66 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
67 }
68 }
69
70 pub fn invalid_state() -> Self {
73 Self {
74 forkchoice_status: ForkchoiceStatus::Invalid,
75 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
76 }
77 }
78
79 pub fn invalid_payload_attributes() -> Self {
82 Self {
83 forkchoice_status: ForkchoiceStatus::Valid,
85 fut: Either::Left(futures::future::ready(Err(
86 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
87 ))),
88 }
89 }
90
91 pub const fn updated_with_pending_payload_id(
93 payload_status: PayloadStatus,
94 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
95 ) -> Self {
96 Self {
97 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
98 fut: Either::Right(PendingPayloadId {
99 payload_status: Some(payload_status),
100 pending_payload_id,
101 }),
102 }
103 }
104}
105
106impl Future for OnForkChoiceUpdated {
107 type Output = ForkChoiceUpdateResult;
108
109 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
110 self.get_mut().fut.poll_unpin(cx)
111 }
112}
113
114#[derive(Debug)]
117struct PendingPayloadId {
118 payload_status: Option<PayloadStatus>,
119 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
120}
121
122impl Future for PendingPayloadId {
123 type Output = ForkChoiceUpdateResult;
124
125 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126 let this = self.get_mut();
127 let res = ready!(this.pending_payload_id.poll_unpin(cx));
128 match res {
129 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
130 payload_status: this.payload_status.take().expect("Polled after completion"),
131 payload_id: Some(payload_id),
132 })),
133 Err(_) | Ok(Err(_)) => {
134 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
136 }
137 }
138 }
139}
140
141#[derive(Debug)]
144pub enum BeaconEngineMessage<Engine: EngineTypes> {
145 NewPayload {
147 payload: Engine::ExecutionData,
149 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
151 },
152 ForkchoiceUpdated {
154 state: ForkchoiceState,
156 payload_attrs: Option<Engine::PayloadAttributes>,
158 version: EngineApiMessageVersion,
160 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
162 },
163 TransitionConfigurationExchanged,
165}
166
167impl<Engine: EngineTypes> Display for BeaconEngineMessage<Engine> {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 match self {
170 Self::NewPayload { payload, .. } => {
171 write!(
172 f,
173 "NewPayload(parent: {}, number: {}, hash: {})",
174 payload.parent_hash(),
175 payload.block_number(),
176 payload.block_hash()
177 )
178 }
179 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
180 write!(
183 f,
184 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
185 payload_attrs.is_some()
186 )
187 }
188 Self::TransitionConfigurationExchanged => {
189 write!(f, "TransitionConfigurationExchanged")
190 }
191 }
192 }
193}
194
195#[derive(Debug, Clone)]
199pub struct BeaconConsensusEngineHandle<Engine>
200where
201 Engine: EngineTypes,
202{
203 to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
204}
205
206impl<Engine> BeaconConsensusEngineHandle<Engine>
207where
208 Engine: EngineTypes,
209{
210 pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
212 Self { to_engine }
213 }
214
215 pub async fn new_payload(
219 &self,
220 payload: Engine::ExecutionData,
221 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
222 let (tx, rx) = oneshot::channel();
223 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
224 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
225 }
226
227 pub async fn fork_choice_updated(
231 &self,
232 state: ForkchoiceState,
233 payload_attrs: Option<Engine::PayloadAttributes>,
234 version: EngineApiMessageVersion,
235 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
236 Ok(self
237 .send_fork_choice_updated(state, payload_attrs, version)
238 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
239 .await?
240 .map_err(BeaconForkChoiceUpdateError::internal)?
241 .await?)
242 }
243
244 fn send_fork_choice_updated(
247 &self,
248 state: ForkchoiceState,
249 payload_attrs: Option<Engine::PayloadAttributes>,
250 version: EngineApiMessageVersion,
251 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
252 let (tx, rx) = oneshot::channel();
253 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
254 state,
255 payload_attrs,
256 tx,
257 version,
258 });
259 rx
260 }
261
262 pub fn transition_configuration_exchanged(&self) {
269 let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
270 }
271}