use crate::cl::ConsensusLayerHealthEvent;
use alloy_consensus::constants::GWEI_TO_WEI;
use alloy_primitives::{BlockNumber, B256};
use alloy_rpc_types_engine::ForkchoiceState;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
use reth_engine_primitives::ForkchoiceStatus;
use reth_network_api::PeersInfo;
use reth_primitives_traits::{format_gas, format_gas_throughput};
use reth_prune_types::PrunerEvent;
use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
use reth_static_file_types::StaticFileProducerEvent;
use std::{
fmt::{Display, Formatter},
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::time::Interval;
use tracing::{debug, info, warn};
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
struct NodeState {
peers_info: Option<Box<dyn PeersInfo>>,
current_stage: Option<CurrentStage>,
latest_block: Option<BlockNumber>,
latest_block_time: Option<u64>,
head_block_hash: Option<B256>,
safe_block_hash: Option<B256>,
finalized_block_hash: Option<B256>,
}
impl NodeState {
const fn new(
peers_info: Option<Box<dyn PeersInfo>>,
latest_block: Option<BlockNumber>,
) -> Self {
Self {
peers_info,
current_stage: None,
latest_block,
latest_block_time: None,
head_block_hash: None,
safe_block_hash: None,
finalized_block_hash: None,
}
}
fn num_connected_peers(&self) -> usize {
self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
}
fn build_current_stage(
&self,
stage_id: StageId,
checkpoint: StageCheckpoint,
target: Option<BlockNumber>,
) -> CurrentStage {
let (eta, entities_checkpoint) = self
.current_stage
.as_ref()
.filter(|current_stage| current_stage.stage_id == stage_id)
.map_or_else(
|| (Eta::default(), None),
|current_stage| (current_stage.eta, current_stage.entities_checkpoint),
);
CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
}
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = self.build_current_stage(stage_id, checkpoint, target);
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
"Preparing stage",
);
self.current_stage = Some(current_stage);
}
PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = self.build_current_stage(stage_id, checkpoint, target);
if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
%stage_eta,
"Executing stage",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
"Executing stage",
);
}
self.current_stage = Some(current_stage);
}
PipelineEvent::Ran {
pipeline_stages_progress,
stage_id,
result: ExecOutput { checkpoint, done },
} => {
if stage_id.is_finish() {
self.latest_block = Some(checkpoint.block_number);
}
if let Some(current_stage) = self.current_stage.as_mut() {
current_stage.checkpoint = checkpoint;
current_stage.entities_checkpoint = checkpoint.entities();
current_stage.eta.update(stage_id, checkpoint);
let target = OptionalField(current_stage.target);
let stage_progress = current_stage
.entities_checkpoint
.and_then(|entities| entities.fmt_percentage());
let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
let message = if done { "Finished stage" } else { "Committed stage progress" };
match (stage_progress, stage_eta) {
(Some(stage_progress), Some(stage_eta)) => {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%stage_progress,
%stage_eta,
"{message}",
)
}
(Some(stage_progress), None) => {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%stage_progress,
"{message}",
)
}
(None, Some(stage_eta)) => {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%stage_eta,
"{message}",
)
}
(None, None) => {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
"{message}",
)
}
}
}
if done {
self.current_stage = None;
}
}
PipelineEvent::Unwind { stage_id, input } => {
let current_stage = CurrentStage {
stage_id,
eta: Eta::default(),
checkpoint: input.checkpoint,
target: Some(input.unwind_to),
entities_checkpoint: input.checkpoint.entities(),
};
self.current_stage = Some(current_stage);
}
_ => (),
}
}
fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
match event {
BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
state;
if self.safe_block_hash != Some(safe_block_hash) &&
self.finalized_block_hash != Some(finalized_block_hash)
{
let msg = match status {
ForkchoiceStatus::Valid => "Forkchoice updated",
ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
ForkchoiceStatus::Syncing => {
"Received forkchoice updated message when syncing"
}
};
info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
}
self.head_block_hash = Some(head_block_hash);
self.safe_block_hash = Some(safe_block_hash);
self.finalized_block_hash = Some(finalized_block_hash);
}
BeaconConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
match live_sync_progress {
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks,
target,
} => {
info!(
remaining_blocks,
target_block_hash=?target,
"Live sync in progress, downloading blocks"
);
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!(
number=block.number,
hash=?block.hash(),
peers=self.num_connected_peers(),
txs=block.body.transactions.len(),
gas=%format_gas(block.header.gas_used),
gas_throughput=%format_gas_throughput(block.header.gas_used, elapsed),
full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64),
base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
blobs=block.header.blob_gas_used.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
?elapsed,
"Block added to canonical chain"
);
}
BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
self.latest_block = Some(head.number);
self.latest_block_time = Some(head.timestamp);
info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
info!(number=block.number, hash=?block.hash(), ?elapsed, "Block added to fork chain");
}
}
}
fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
if self.current_stage.is_none() {
match event {
ConsensusLayerHealthEvent::NeverSeen => {
warn!("Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
warn!(?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::NeverReceivedUpdates => {
warn!("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
}
ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
warn!(?period, "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!")
}
}
}
}
fn handle_pruner_event(&self, event: PrunerEvent) {
match event {
PrunerEvent::Started { tip_block_number } => {
debug!(tip_block_number, "Pruner started");
}
PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
let stats = format!(
"[{}]",
stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
);
debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
}
}
}
fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
match event {
StaticFileProducerEvent::Started { targets } => {
debug!(?targets, "Static File Producer started");
}
StaticFileProducerEvent::Finished { targets, elapsed } => {
debug!(?targets, ?elapsed, "Static File Producer finished");
}
}
}
}
struct OptionalField<T: Display>(Option<T>);
impl<T: Display> Display for OptionalField<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(field) = &self.0 {
write!(f, "{field}")
} else {
write!(f, "None")
}
}
}
struct CurrentStage {
stage_id: StageId,
eta: Eta,
checkpoint: StageCheckpoint,
entities_checkpoint: Option<EntitiesCheckpoint>,
target: Option<BlockNumber>,
}
#[derive(Debug)]
pub enum NodeEvent {
Pipeline(PipelineEvent),
ConsensusEngine(BeaconConsensusEngineEvent),
ConsensusLayerHealth(ConsensusLayerHealthEvent),
Pruner(PrunerEvent),
StaticFileProducer(StaticFileProducerEvent),
Other(String),
}
impl From<PipelineEvent> for NodeEvent {
fn from(event: PipelineEvent) -> Self {
Self::Pipeline(event)
}
}
impl From<BeaconConsensusEngineEvent> for NodeEvent {
fn from(event: BeaconConsensusEngineEvent) -> Self {
Self::ConsensusEngine(event)
}
}
impl From<ConsensusLayerHealthEvent> for NodeEvent {
fn from(event: ConsensusLayerHealthEvent) -> Self {
Self::ConsensusLayerHealth(event)
}
}
impl From<PrunerEvent> for NodeEvent {
fn from(event: PrunerEvent) -> Self {
Self::Pruner(event)
}
}
impl From<StaticFileProducerEvent> for NodeEvent {
fn from(event: StaticFileProducerEvent) -> Self {
Self::StaticFileProducer(event)
}
}
pub async fn handle_events<E>(
peers_info: Option<Box<dyn PeersInfo>>,
latest_block_number: Option<BlockNumber>,
events: E,
) where
E: Stream<Item = NodeEvent> + Unpin,
{
let state = NodeState::new(peers_info, latest_block_number);
let start = tokio::time::Instant::now() + Duration::from_secs(3);
let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let handler = EventHandler { state, events, info_interval };
handler.await
}
#[pin_project::pin_project]
struct EventHandler<E> {
state: NodeState,
#[pin]
events: E,
#[pin]
info_interval: Interval,
}
impl<E> Future for EventHandler<E>
where
E: Stream<Item = NodeEvent> + Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
while this.info_interval.poll_tick(cx).is_ready() {
if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
&this.state.current_stage
{
let stage_progress =
entities_checkpoint.and_then(|entities| entities.fmt_percentage());
let stage_eta = eta.fmt_for_stage(*stage_id);
match (stage_progress, stage_eta) {
(Some(stage_progress), Some(stage_eta)) => {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_progress,
%stage_eta,
"Status"
)
}
(Some(stage_progress), None) => {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_progress,
"Status"
)
}
(None, Some(stage_eta)) => {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_eta,
"Status"
)
}
(None, None) => {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
"Status"
)
}
}
} else if let Some(latest_block) = this.state.latest_block {
let now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%latest_block,
"Status"
);
}
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
"Status"
);
}
}
while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
match event {
NodeEvent::Pipeline(event) => {
this.state.handle_pipeline_event(event);
}
NodeEvent::ConsensusEngine(event) => {
this.state.handle_consensus_engine_event(event);
}
NodeEvent::ConsensusLayerHealth(event) => {
this.state.handle_consensus_layer_health_event(event)
}
NodeEvent::Pruner(event) => {
this.state.handle_pruner_event(event);
}
NodeEvent::StaticFileProducer(event) => {
this.state.handle_static_file_producer_event(event);
}
NodeEvent::Other(event_description) => {
warn!("{event_description}");
}
}
}
Poll::Pending
}
}
#[derive(Default, Copy, Clone)]
struct Eta {
last_checkpoint: EntitiesCheckpoint,
last_checkpoint_time: Option<Instant>,
eta: Option<Duration>,
}
impl Eta {
fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
let Some(current) = checkpoint.entities() else { return };
if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
let Some(processed_since_last) =
current.processed.checked_sub(self.last_checkpoint.processed)
else {
self.eta = None;
debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
return
};
let elapsed = last_checkpoint_time.elapsed();
let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
let Some(remaining) = current.total.checked_sub(current.processed) else {
self.eta = None;
debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
return
};
self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
}
self.last_checkpoint = current;
self.last_checkpoint_time = Some(Instant::now());
}
fn is_available(&self) -> bool {
self.eta.zip(self.last_checkpoint_time).is_some()
}
fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
if !self.is_available() ||
matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
{
None
} else {
Some(self.to_string())
}
}
}
impl Display for Eta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
if let Some(remaining) = remaining {
return write!(
f,
"{}",
humantime::format_duration(Duration::from_secs(remaining.as_secs()))
)
}
}
write!(f, "unknown")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn eta_display_no_milliseconds() {
let eta = Eta {
last_checkpoint_time: Some(Instant::now()),
eta: Some(Duration::from_millis(
13 * 60 * 1000 + 37 * 1000 + 999, )),
..Default::default()
}
.to_string();
assert_eq!(eta, "13m 37s");
}
}