reth_beacon_consensus/engine/
message.rsuse crate::engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkChoiceUpdateResult, ForkchoiceState,
ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
};
use futures::{future::Either, FutureExt};
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::RethResult;
use reth_payload_primitives::PayloadBuilderError;
use std::{
fmt::Display,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnForkChoiceUpdated {
forkchoice_status: ForkchoiceStatus,
fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
}
impl OnForkChoiceUpdated {
pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
self.forkchoice_status
}
pub fn syncing() -> Self {
let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
Self {
forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
pub fn valid(status: PayloadStatus) -> Self {
Self {
forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
pub fn with_invalid(status: PayloadStatus) -> Self {
Self {
forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
pub fn invalid_state() -> Self {
Self {
forkchoice_status: ForkchoiceStatus::Invalid,
fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
}
}
pub fn invalid_payload_attributes() -> Self {
Self {
forkchoice_status: ForkchoiceStatus::Valid,
fut: Either::Left(futures::future::ready(Err(
ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
))),
}
}
pub const fn updated_with_pending_payload_id(
payload_status: PayloadStatus,
pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
) -> Self {
Self {
forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
fut: Either::Right(PendingPayloadId {
payload_status: Some(payload_status),
pending_payload_id,
}),
}
}
}
impl Future for OnForkChoiceUpdated {
type Output = ForkChoiceUpdateResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().fut.poll_unpin(cx)
}
}
#[derive(Debug)]
struct PendingPayloadId {
payload_status: Option<PayloadStatus>,
pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
}
impl Future for PendingPayloadId {
type Output = ForkChoiceUpdateResult;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let res = ready!(this.pending_payload_id.poll_unpin(cx));
match res {
Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
payload_status: this.payload_status.take().expect("Polled after completion"),
payload_id: Some(payload_id),
})),
Err(_) | Ok(Err(_)) => {
Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
}
}
}
}
#[derive(Debug)]
pub enum BeaconEngineMessage<Engine: EngineTypes> {
NewPayload {
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
},
ForkchoiceUpdated {
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
},
TransitionConfigurationExchanged,
}
impl<Engine: EngineTypes> Display for BeaconEngineMessage<Engine> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NewPayload { payload, .. } => {
write!(
f,
"NewPayload(parent: {}, number: {}, hash: {})",
payload.parent_hash(),
payload.block_number(),
payload.block_hash()
)
}
Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
write!(
f,
"ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
payload_attrs.is_some()
)
}
Self::TransitionConfigurationExchanged => {
write!(f, "TransitionConfigurationExchanged")
}
}
}
}