1use crate::{
2 error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, ExecutionPayload, ForkchoiceStatus,
3};
4use alloy_eips::eip4895::Withdrawal;
5use alloy_primitives::{Bytes, B256};
6use alloy_rpc_types_engine::{
7 ExecutionData, ForkChoiceUpdateResult, ForkchoiceState, ForkchoiceUpdateError,
8 ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
9};
10use core::{
11 fmt::{self, Display},
12 future::Future,
13 pin::Pin,
14 task::{ready, Context, Poll},
15};
16use futures::{future::Either, FutureExt, TryFutureExt};
17use reth_errors::RethResult;
18use reth_payload_builder_primitives::PayloadBuilderError;
19use reth_payload_primitives::PayloadTypes;
20use std::time::{Duration, Instant};
21use tokio::sync::{mpsc::UnboundedSender, oneshot};
22
23#[deprecated(note = "Use ConsensusEngineHandle instead")]
25pub type BeaconConsensusEngineHandle<Payload> = ConsensusEngineHandle<Payload>;
26
27#[must_use = "futures do nothing unless you `.await` or poll them"]
31#[derive(Debug)]
32pub struct OnForkChoiceUpdated {
33 forkchoice_status: ForkchoiceStatus,
38 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
40}
41
42impl OnForkChoiceUpdated {
45 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
47 self.forkchoice_status
48 }
49
50 pub fn syncing() -> Self {
52 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
53 Self {
54 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
55 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
56 }
57 }
58
59 pub fn valid(status: PayloadStatus) -> Self {
62 Self {
63 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
64 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
65 }
66 }
67
68 pub fn with_invalid(status: PayloadStatus) -> Self {
71 Self {
72 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
73 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
74 }
75 }
76
77 pub fn invalid_state() -> Self {
80 Self {
81 forkchoice_status: ForkchoiceStatus::Invalid,
82 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
83 }
84 }
85
86 pub fn invalid_payload_attributes() -> Self {
89 Self {
90 forkchoice_status: ForkchoiceStatus::Valid,
92 fut: Either::Left(futures::future::ready(Err(
93 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
94 ))),
95 }
96 }
97
98 pub const fn updated_with_pending_payload_id(
100 payload_status: PayloadStatus,
101 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
102 ) -> Self {
103 Self {
104 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
105 fut: Either::Right(PendingPayloadId {
106 payload_status: Some(payload_status),
107 pending_payload_id,
108 }),
109 }
110 }
111}
112
113impl Future for OnForkChoiceUpdated {
114 type Output = ForkChoiceUpdateResult;
115
116 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
117 self.get_mut().fut.poll_unpin(cx)
118 }
119}
120
121#[derive(Debug)]
124struct PendingPayloadId {
125 payload_status: Option<PayloadStatus>,
126 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
127}
128
129impl Future for PendingPayloadId {
130 type Output = ForkChoiceUpdateResult;
131
132 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 let this = self.get_mut();
134 let res = ready!(this.pending_payload_id.poll_unpin(cx));
135 match res {
136 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
137 payload_status: this.payload_status.take().expect("Polled after completion"),
138 payload_id: Some(payload_id),
139 })),
140 Err(_) | Ok(Err(_)) => {
141 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
143 }
144 }
145 }
146}
147
148#[derive(Debug, Clone, Copy)]
150pub struct NewPayloadTimings {
151 pub latency: Duration,
153 pub persistence_wait: Duration,
157 pub execution_cache_wait: Option<Duration>,
161 pub sparse_trie_wait: Option<Duration>,
165}
166
167#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
172pub struct BigBlockData<ExecutionData> {
173 pub env_switches: Vec<ExecutionData>,
179 pub prior_block_hashes: Vec<(u64, alloy_primitives::B256)>,
183 pub block_number: u64,
185 #[serde(default, skip_serializing_if = "Option::is_none")]
187 pub merged_block_access_list: Option<Bytes>,
188}
189
190impl ExecutionPayload for BigBlockData<ExecutionData> {
191 fn parent_hash(&self) -> B256 {
192 self.env_switches[0].parent_hash()
193 }
194
195 fn block_hash(&self) -> B256 {
196 self.env_switches.last().unwrap().block_hash()
197 }
198
199 fn block_number(&self) -> u64 {
200 self.block_number
201 }
202
203 fn withdrawals(&self) -> Option<&Vec<Withdrawal>> {
204 self.env_switches[0].withdrawals()
205 }
206
207 fn block_access_list(&self) -> Option<&Bytes> {
208 self.merged_block_access_list.as_ref()
209 }
210
211 fn parent_beacon_block_root(&self) -> Option<B256> {
212 self.env_switches[0].parent_beacon_block_root()
213 }
214
215 fn timestamp(&self) -> u64 {
216 self.env_switches[0].timestamp()
217 }
218
219 fn gas_used(&self) -> u64 {
220 self.env_switches.iter().map(|data| data.gas_used()).sum()
221 }
222
223 fn gas_limit(&self) -> u64 {
224 self.env_switches.iter().map(|data| data.gas_limit()).sum()
225 }
226
227 fn transaction_count(&self) -> usize {
228 self.env_switches.iter().map(|data| data.transaction_count()).sum()
229 }
230
231 fn slot_number(&self) -> Option<u64> {
232 self.env_switches[0].payload.slot_number()
233 }
234}
235
236#[derive(Debug)]
239pub enum BeaconEngineMessage<Payload: PayloadTypes> {
240 NewPayload {
242 payload: Payload::ExecutionData,
244 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
246 },
247 RethNewPayload {
254 payload: Payload::ExecutionData,
256 wait_for_persistence: bool,
258 wait_for_caches: bool,
260 tx: oneshot::Sender<Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError>>,
262 enqueued_at: Instant,
264 },
265 ForkchoiceUpdated {
267 state: ForkchoiceState,
269 payload_attrs: Option<Payload::PayloadAttributes>,
271 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
273 },
274}
275
276impl<Payload: PayloadTypes> Display for BeaconEngineMessage<Payload> {
277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278 match self {
279 Self::NewPayload { payload, .. } => {
280 write!(
281 f,
282 "NewPayload(parent: {}, number: {}, hash: {})",
283 payload.parent_hash(),
284 payload.block_number(),
285 payload.block_hash()
286 )
287 }
288 Self::RethNewPayload { payload, .. } => {
289 write!(
290 f,
291 "RethNewPayload(parent: {}, number: {}, hash: {})",
292 payload.parent_hash(),
293 payload.block_number(),
294 payload.block_hash()
295 )
296 }
297 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
298 write!(
301 f,
302 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
303 payload_attrs.is_some()
304 )
305 }
306 }
307 }
308}
309
310#[derive(Debug, Clone)]
314pub struct ConsensusEngineHandle<Payload>
315where
316 Payload: PayloadTypes,
317{
318 to_engine: UnboundedSender<BeaconEngineMessage<Payload>>,
319}
320
321impl<Payload> ConsensusEngineHandle<Payload>
322where
323 Payload: PayloadTypes,
324{
325 pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Payload>>) -> Self {
327 Self { to_engine }
328 }
329
330 pub async fn new_payload(
334 &self,
335 payload: Payload::ExecutionData,
336 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
337 let (tx, rx) = oneshot::channel();
338 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, tx });
339 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
340 }
341
342 pub async fn reth_new_payload(
349 &self,
350 payload: Payload::ExecutionData,
351 wait_for_persistence: bool,
352 wait_for_caches: bool,
353 ) -> Result<(PayloadStatus, NewPayloadTimings), BeaconOnNewPayloadError> {
354 let (tx, rx) = oneshot::channel();
355 let _ = self.to_engine.send(BeaconEngineMessage::RethNewPayload {
356 payload,
357 wait_for_persistence,
358 wait_for_caches,
359 tx,
360 enqueued_at: Instant::now(),
361 });
362 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
363 }
364
365 pub async fn fork_choice_updated(
369 &self,
370 state: ForkchoiceState,
371 payload_attrs: Option<Payload::PayloadAttributes>,
372 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
373 Ok(self
374 .send_fork_choice_updated(state, payload_attrs)
375 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
376 .await?
377 .map_err(BeaconForkChoiceUpdateError::internal)?
378 .await?)
379 }
380
381 fn send_fork_choice_updated(
384 &self,
385 state: ForkchoiceState,
386 payload_attrs: Option<Payload::PayloadAttributes>,
387 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
388 let (tx, rx) = oneshot::channel();
389 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
390 state,
391 payload_attrs,
392 tx,
393 });
394 rx
395 }
396}