1use crate::{
2 error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_rpc_types_engine::{
5 ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
6 PayloadStatus, PayloadStatusEnum,
7};
8use core::{
9 fmt::{self, Display},
10 future::Future,
11 pin::Pin,
12 task::{ready, Context, Poll},
13};
14use futures::{future::Either, FutureExt, TryFutureExt};
15use reth_errors::RethResult;
16use reth_payload_builder_primitives::PayloadBuilderError;
17use reth_payload_primitives::{EngineApiMessageVersion, PayloadTypes};
18use std::time::Duration;
19use tokio::sync::{mpsc::UnboundedSender, oneshot};
20
21#[deprecated(note = "Use ConsensusEngineHandle instead")]
23pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
24
25#[must_use = "futures do nothing unless you `.await` or poll them"]
29#[derive(Debug)]
30pub struct OnForkChoiceUpdated {
31 forkchoice_status: ForkchoiceStatus,
36 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
38}
39
40impl OnForkChoiceUpdated {
43 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
45 self.forkchoice_status
46 }
47
48 pub fn syncing() -> Self {
50 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
51 Self {
52 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
53 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
54 }
55 }
56
57 pub fn valid(status: PayloadStatus) -> Self {
60 Self {
61 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
62 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
63 }
64 }
65
66 pub fn with_invalid(status: PayloadStatus) -> Self {
69 Self {
70 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
71 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
72 }
73 }
74
75 pub fn invalid_state() -> Self {
78 Self {
79 forkchoice_status: ForkchoiceStatus::Invalid,
80 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
81 }
82 }
83
84 pub fn invalid_payload_attributes() -> Self {
87 Self {
88 forkchoice_status: ForkchoiceStatus::Valid,
90 fut: Either::Left(futures::future::ready(Err(
91 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
92 ))),
93 }
94 }
95
96 pub const fn updated_with_pending_payload_id(
98 payload_status: PayloadStatus,
99 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
100 ) -> Self {
101 Self {
102 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
103 fut: Either::Right(PendingPayloadId {
104 payload_status: Some(payload_status),
105 pending_payload_id,
106 }),
107 }
108 }
109}
110
111impl Future for OnForkChoiceUpdated {
112 type Output = ForkChoiceUpdateResult;
113
114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115 self.get_mut().fut.poll_unpin(cx)
116 }
117}
118
119#[derive(Debug)]
122struct PendingPayloadId {
123 payload_status: Option<PayloadStatus>,
124 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
125}
126
127impl Future for PendingPayloadId {
128 type Output = ForkChoiceUpdateResult;
129
130 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131 let this = self.get_mut();
132 let res = ready!(this.pending_payload_id.poll_unpin(cx));
133 match res {
134 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
135 payload_status: this.payload_status.take().expect("Polled after completion"),
136 payload_id: Some(payload_id),
137 })),
138 Err(_) | Ok(Err(_)) => {
139 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
141 }
142 }
143 }
144}
145
146#[derive(Debug, Clone, Copy)]
148pub struct NewPayloadTimings {
149 pub latency: Duration,
151 pub persistence_wait: Option<Duration>,
154 pub execution_cache_wait: Duration,
156 pub sparse_trie_wait: Duration,
158}
159
160#[derive(Debug)]
163pub enum BeaconEngineMessage<Payload: PayloadTypes> {
164 NewPayload {
166 payload: Payload::ExecutionData,
168 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
170 },
171 RethNewPayload {
176 payload: Payload::ExecutionData,
178 tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
180 },
181 ForkchoiceUpdated {
183 state: ForkchoiceState,
185 payload_attrs: Option<Payload::PayloadAttributes>,
187 version: EngineApiMessageVersion,
189 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
191 },
192}
193
194impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 match self {
197 Self::NewPayload { payload, .. } => {
198 write!(
199 f,
200 "NewPayload(parent: {}, number: {}, hash: {})",
201 payload.parent_hash(),
202 payload.block_number(),
203 payload.block_hash()
204 )
205 }
206 Self::RethNewPayload { payload, .. } => {
207 write!(
208 f,
209 "RethNewPayload(parent: {}, number: {}, hash: {})",
210 payload.parent_hash(),
211 payload.block_number(),
212 payload.block_hash()
213 )
214 }
215 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
216 write!(
219 f,
220 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
221 payload_attrs.is_some()
222 )
223 }
224 }
225 }
226}
227
228#[derive(Debug, Clone)]
232pub struct ConsensusEngineHandle<Payload>
233where
234 Payload: PayloadTypes,
235{
236 to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
237}
238
239impl<Payload> ConsensusEngineHandle<Payload>
240where
241 Payload: PayloadTypes,
242{
243 pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
245 Self { to_engine }
246 }
247
248 pub async fn new_payload(
252 &self,
253 payload: Payload::ExecutionData,
254 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
255 let (tx, rx) = oneshot::channel();
256 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
257 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
258 }
259
260 pub async fn reth_new_payload(
265 &self,
266 payload: Payload::ExecutionData,
267 ) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
268 let (tx, rx) = oneshot::channel();
269 let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload { payload, tx });
270 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
271 }
272
273 pub async fn fork_choice_updated(
277 &self,
278 state: ForkchoiceState,
279 payload_attrs: Option<Payload::PayloadAttributes>,
280 version: EngineApiMessageVersion,
281 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
282 Ok(self
283 .send_fork_choice_updated(state, payload_attrs, version)
284 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
285 .await?
286 .map_err(BeaconForkChoiceUpdateError::internal)?
287 .await?)
288 }
289
290 fn send_fork_choice_updated(
293 &self,
294 state: ForkchoiceState,
295 payload_attrs: Option<Payload::PayloadAttributes>,
296 version: EngineApiMessageVersion,
297 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
298 let (tx, rx) = oneshot::channel();
299 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
300 state,
301 payload_attrs,
302 tx,
303 version,
304 });
305 rx
306 }
307}