reth_engine_local/
miner.rsuse alloy_primitives::{TxHash, B256};
use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar, ForkchoiceState};
use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadKind, PayloadTypes,
};
use reth_provider::{BlockReader, ChainSpecProvider};
use reth_rpc_types_compat::engine::payload::block_to_payload;
use reth_transaction_pool::TransactionPool;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, UNIX_EPOCH},
};
use tokio::{
sync::{mpsc::UnboundedSender, oneshot},
time::Interval,
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::error;
#[derive(Debug)]
pub enum MiningMode {
Instant(Fuse<ReceiverStream<TxHash>>),
Interval(Interval),
}
impl MiningMode {
pub fn instant<Pool: TransactionPool>(pool: Pool) -> Self {
let rx = pool.pending_transactions_listener();
Self::Instant(ReceiverStream::new(rx).fuse())
}
pub fn interval(duration: Duration) -> Self {
let start = tokio::time::Instant::now() + duration;
Self::Interval(tokio::time::interval_at(start, duration))
}
}
impl Future for MiningMode {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this {
Self::Instant(rx) => {
if let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
return Poll::Ready(())
}
Poll::Pending
}
Self::Interval(interval) => {
if interval.poll_tick(cx).is_ready() {
return Poll::Ready(())
}
Poll::Pending
}
}
}
}
#[derive(Debug)]
pub struct LocalMiner<EngineT: EngineTypes, Provider, B> {
provider: Provider,
payload_attributes_builder: B,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
mode: MiningMode,
payload_builder: PayloadBuilderHandle<EngineT>,
last_timestamp: u64,
last_block_hashes: Vec<B256>,
}
impl<EngineT, Provider, B> LocalMiner<EngineT, Provider, B>
where
EngineT: EngineTypes,
Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks> + 'static,
B: PayloadAttributesBuilder<<EngineT as PayloadTypes>::PayloadAttributes>,
{
pub fn spawn_new(
provider: Provider,
payload_attributes_builder: B,
to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
mode: MiningMode,
payload_builder: PayloadBuilderHandle<EngineT>,
) {
let latest_header =
provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
let miner = Self {
provider,
payload_attributes_builder,
to_engine,
mode,
payload_builder,
last_timestamp: latest_header.timestamp,
last_block_hashes: vec![latest_header.hash()],
};
tokio::spawn(miner.run());
}
async fn run(mut self) {
let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = &mut self.mode => {
if let Err(e) = self.advance().await {
error!(target: "engine::local", "Error advancing the chain: {:?}", e);
}
}
_ = fcu_interval.tick() => {
if let Err(e) = self.update_forkchoice_state().await {
error!(target: "engine::local", "Error updating fork choice: {:?}", e);
}
}
}
}
}
fn forkchoice_state(&self) -> ForkchoiceState {
ForkchoiceState {
head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
safe_block_hash: *self
.last_block_hashes
.get(self.last_block_hashes.len().saturating_sub(32))
.expect("at least 1 block exists"),
finalized_block_hash: *self
.last_block_hashes
.get(self.last_block_hashes.len().saturating_sub(64))
.expect("at least 1 block exists"),
}
}
async fn update_forkchoice_state(&self) -> eyre::Result<()> {
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state: self.forkchoice_state(),
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
})?;
let res = rx.await??;
if !res.forkchoice_status().is_valid() {
eyre::bail!("Invalid fork choice update")
}
Ok(())
}
async fn advance(&mut self) -> eyre::Result<()> {
let timestamp = std::cmp::max(
self.last_timestamp + 1,
std::time::SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("cannot be earlier than UNIX_EPOCH")
.as_secs(),
);
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state: self.forkchoice_state(),
payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
tx,
version: EngineApiMessageVersion::default(),
})?;
let res = rx.await??.await?;
if !res.payload_status.is_valid() {
eyre::bail!("Invalid payload status")
}
let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
let Some(Ok(payload)) =
self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
else {
eyre::bail!("No payload")
};
let block = payload.block();
let cancun_fields =
self.provider.chain_spec().is_cancun_active_at_timestamp(block.timestamp).then(|| {
CancunPayloadFields {
parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
versioned_hashes: block.blob_versioned_hashes().into_iter().copied().collect(),
}
});
let (tx, rx) = oneshot::channel();
self.to_engine.send(BeaconEngineMessage::NewPayload {
payload: block_to_payload(payload.block().clone()),
sidecar: cancun_fields
.map(ExecutionPayloadSidecar::v3)
.unwrap_or_else(ExecutionPayloadSidecar::none),
tx,
})?;
let res = rx.await??;
if !res.is_valid() {
eyre::bail!("Invalid payload")
}
self.last_timestamp = timestamp;
self.last_block_hashes.push(block.hash());
if self.last_block_hashes.len() > 64 {
self.last_block_hashes =
self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
}
Ok(())
}
}