1use crate::{
2 error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_rpc_types_engine::{
5 ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId,
6 PayloadStatus, PayloadStatusEnum,
7};
8use core::{
9 fmt::{self, Display},
10 future::Future,
11 pin::Pin,
12 task::{ready, Context, Poll},
13};
14use futures::{future::Either, FutureExt, TryFutureExt};
15use reth_errors::RethResult;
16use reth_payload_builder_primitives::PayloadBuilderError;
17use reth_payload_primitives::PayloadTypes;
18use std::time::{Duration, Instant};
19use tokio::sync::{mpsc::UnboundedSender, oneshot};
20
21#[deprecated(note = "Use ConsensusEngineHandle instead")]
23pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
24
25#[must_use = "futures do nothing unless you `.await` or poll them"]
29#[derive(Debug)]
30pub struct OnForkChoiceUpdated {
31 forkchoice_status: ForkchoiceStatus,
36 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
38}
39
40impl OnForkChoiceUpdated {
43 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
45 self.forkchoice_status
46 }
47
48 pub fn syncing() -> Self {
50 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
51 Self {
52 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
53 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
54 }
55 }
56
57 pub fn valid(status: PayloadStatus) -> Self {
60 Self {
61 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
62 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
63 }
64 }
65
66 pub fn with_invalid(status: PayloadStatus) -> Self {
69 Self {
70 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
71 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
72 }
73 }
74
75 pub fn invalid_state() -> Self {
78 Self {
79 forkchoice_status: ForkchoiceStatus::Invalid,
80 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
81 }
82 }
83
84 pub fn invalid_payload_attributes() -> Self {
87 Self {
88 forkchoice_status: ForkchoiceStatus::Valid,
90 fut: Either::Left(futures::future::ready(Err(
91 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
92 ))),
93 }
94 }
95
96 pub const fn updated_with_pending_payload_id(
98 payload_status: PayloadStatus,
99 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
100 ) -> Self {
101 Self {
102 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
103 fut: Either::Right(PendingPayloadId {
104 payload_status: Some(payload_status),
105 pending_payload_id,
106 }),
107 }
108 }
109}
110
111impl Future for OnForkChoiceUpdated {
112 type Output = ForkChoiceUpdateResult;
113
114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115 self.get_mut().fut.poll_unpin(cx)
116 }
117}
118
119#[derive(Debug)]
122struct PendingPayloadId {
123 payload_status: Option<PayloadStatus>,
124 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
125}
126
127impl Future for PendingPayloadId {
128 type Output = ForkChoiceUpdateResult;
129
130 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131 let this = self.get_mut();
132 let res = ready!(this.pending_payload_id.poll_unpin(cx));
133 match res {
134 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
135 payload_status: this.payload_status.take().expect("Polled after completion"),
136 payload_id: Some(payload_id),
137 })),
138 Err(_) | Ok(Err(_)) => {
139 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
141 }
142 }
143 }
144}
145
146#[derive(Debug, Clone, Copy)]
148pub struct NewPayloadTimings {
149 pub latency: Duration,
151 pub persistence_wait: Duration,
155 pub execution_cache_wait: Option<Duration>,
159 pub sparse_trie_wait: Option<Duration>,
163}
164
165#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
170pub struct BigBlockData<ExecutionData> {
171 pub env_switches: Vec<(usize, ExecutionData)>,
177 pub prior_block_hashes: Vec<(u64, alloy_primitives::B256)>,
181}
182
183impl<T> Default for BigBlockData<T> {
184 fn default() -> Self {
185 Self { env_switches: Vec::new(), prior_block_hashes: Vec::new() }
186 }
187}
188
189#[derive(Debug)]
192pub enum BeaconEngineMessage<Payload: PayloadTypes> {
193 NewPayload {
195 payload: Payload::ExecutionData,
197 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
199 },
200 RethNewPayload {
207 payload: Payload::ExecutionData,
209 wait_for_persistence: bool,
211 wait_for_caches: bool,
213 tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
215 enqueued_at: Instant,
217 },
218 ForkchoiceUpdated {
220 state: ForkchoiceState,
222 payload_attrs: Option<Payload::PayloadAttributes>,
224 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
226 },
227}
228
229impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 match self {
232 Self::NewPayload { payload, .. } => {
233 write!(
234 f,
235 "NewPayload(parent: {}, number: {}, hash: {})",
236 payload.parent_hash(),
237 payload.block_number(),
238 payload.block_hash()
239 )
240 }
241 Self::RethNewPayload { payload, .. } => {
242 write!(
243 f,
244 "RethNewPayload(parent: {}, number: {}, hash: {})",
245 payload.parent_hash(),
246 payload.block_number(),
247 payload.block_hash()
248 )
249 }
250 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
251 write!(
254 f,
255 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
256 payload_attrs.is_some()
257 )
258 }
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
267pub struct ConsensusEngineHandle<Payload>
268where
269 Payload: PayloadTypes,
270{
271 to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
272}
273
274impl<Payload> ConsensusEngineHandle<Payload>
275where
276 Payload: PayloadTypes,
277{
278 pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
280 Self { to_engine }
281 }
282
283 pub async fn new_payload(
287 &self,
288 payload: Payload::ExecutionData,
289 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
290 let (tx, rx) = oneshot::channel();
291 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
292 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
293 }
294
295 pub async fn reth_new_payload(
302 &self,
303 payload: Payload::ExecutionData,
304 wait_for_persistence: bool,
305 wait_for_caches: bool,
306 ) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
307 let (tx, rx) = oneshot::channel();
308 let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload {
309 payload,
310 wait_for_persistence,
311 wait_for_caches,
312 tx,
313 enqueued_at: Instant::now(),
314 });
315 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
316 }
317
318 pub async fn fork_choice_updated(
322 &self,
323 state: ForkchoiceState,
324 payload_attrs: Option<Payload::PayloadAttributes>,
325 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
326 Ok(self
327 .send_fork_choice_updated(state, payload_attrs)
328 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
329 .await?
330 .map_err(BeaconForkChoiceUpdateError::internal)?
331 .await?)
332 }
333
334 fn send_fork_choice_updated(
337 &self,
338 state: ForkchoiceState,
339 payload_attrs: Option<Payload::PayloadAttributes>,
340 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
341 let (tx, rx) = oneshot::channel();
342 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
343 state,
344 payload_attrs,
345 tx,
346 });
347 rx
348 }
349}