reth_beacon_consensus/engine/
handle.rsuse crate::{
engine::message::OnForkChoiceUpdated, BeaconConsensusEngineEvent, BeaconEngineMessage,
BeaconForkChoiceUpdateError, BeaconOnNewPayloadError,
};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use futures::TryFutureExt;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::RethResult;
use reth_tokio_util::{EventSender, EventStream};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
#[derive(Debug, Clone)]
pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
}
impl<Engine> BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub const fn new(
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self { to_engine, event_sender }
}
pub async fn new_payload(
&self,
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs, version)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
});
rx
}
pub fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
pub fn event_listener(&self) -> EventStream<BeaconConsensusEngineEvent> {
self.event_sender.new_listener()
}
}