reth_beacon_consensus/engine/handle.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
//! `BeaconConsensusEngine` external API
use crate::BeaconForkChoiceUpdateError;
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use futures::TryFutureExt;
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
OnForkChoiceUpdated,
};
use reth_errors::RethResult;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
/// engine task.
///
/// See also `BeaconConsensusEngine`
#[derive(Debug, Clone)]
pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
}
// === impl BeaconConsensusEngineHandle ===
impl<Engine> BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
Self { to_engine }
}
/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs, version)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}
/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
});
rx
}
/// Sends a transition configuration exchange message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
///
/// This only notifies about the exchange. The actual exchange is done by the engine API impl
/// itself.
pub fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
}