use crate::{metrics::SyncMetrics, StageCheckpoint, StageId};
use alloy_primitives::BlockNumber;
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::trace;
pub type MetricEventsSender = UnboundedSender<MetricEvent>;
#[derive(Clone, Copy, Debug)]
pub enum MetricEvent {
SyncHeight {
height: BlockNumber,
},
StageCheckpoint {
stage_id: StageId,
checkpoint: StageCheckpoint,
max_block_number: Option<BlockNumber>,
},
}
#[derive(Debug)]
pub struct MetricsListener {
events_rx: UnboundedReceiver<MetricEvent>,
pub(crate) sync_metrics: SyncMetrics,
}
impl MetricsListener {
pub fn new(events_rx: UnboundedReceiver<MetricEvent>) -> Self {
Self { events_rx, sync_metrics: SyncMetrics::default() }
}
fn handle_event(&mut self, event: MetricEvent) {
trace!(target: "sync::metrics", ?event, "Metric event received");
match event {
MetricEvent::SyncHeight { height } => {
for stage_id in StageId::ALL {
self.handle_event(MetricEvent::StageCheckpoint {
stage_id,
checkpoint: StageCheckpoint {
block_number: height,
stage_checkpoint: None,
},
max_block_number: Some(height),
});
}
}
MetricEvent::StageCheckpoint { stage_id, checkpoint, max_block_number } => {
let stage_metrics = self.sync_metrics.get_stage_metrics(stage_id);
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
let (processed, total) = match checkpoint.entities() {
Some(entities) => (entities.processed, Some(entities.total)),
None => (checkpoint.block_number, max_block_number),
};
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
}
}
}
}
impl Future for MetricsListener {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
let Some(event) = ready!(this.events_rx.poll_recv(cx)) else {
return Poll::Ready(())
};
this.handle_event(event);
}
}
}