1use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader};
5use alloy_primitives::{BlockNumber, B256};
6use alloy_rpc_types_engine::ForkchoiceState;
7use futures::Stream;
8use reth_engine_primitives::{ConsensusEngineEvent, ForkchoiceStatus};
9use reth_network_api::PeersInfo;
10use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
11use reth_prune_types::PrunerEvent;
12use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
13use reth_static_file_types::StaticFileProducerEvent;
14use std::{
15 fmt::{Display, Formatter},
16 future::Future,
17 pin::Pin,
18 task::{Context, Poll},
19 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
20};
21use tokio::time::Interval;
22use tracing::{debug, info, warn};
23
24const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
26
27struct NodeState {
32 peers_info: Option<Box<dyn PeersInfo>>,
34 current_stage: Option<CurrentStage>,
36 latest_block: Option<BlockNumber>,
38 head_block_hash: Option<B256>,
40 safe_block_hash: Option<B256>,
42 finalized_block_hash: Option<B256>,
44 last_status_log_time: Option<u64>,
46}
47
48impl NodeState {
49 const fn new(
50 peers_info: Option<Box<dyn PeersInfo>>,
51 latest_block: Option<BlockNumber>,
52 ) -> Self {
53 Self {
54 peers_info,
55 current_stage: None,
56 latest_block,
57 head_block_hash: None,
58 safe_block_hash: None,
59 finalized_block_hash: None,
60 last_status_log_time: None,
61 }
62 }
63
64 fn num_connected_peers(&self) -> usize {
65 self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
66 }
67
68 fn build_current_stage(
69 &self,
70 stage_id: StageId,
71 checkpoint: StageCheckpoint,
72 target: Option<BlockNumber>,
73 ) -> CurrentStage {
74 let (eta, entities_checkpoint) = self
75 .current_stage
76 .as_ref()
77 .filter(|current_stage| current_stage.stage_id == stage_id)
78 .map_or_else(
79 || (Eta::default(), None),
80 |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
81 );
82
83 CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
84 }
85
86 fn handle_pipeline_event(&mut self, event: PipelineEvent) {
88 match event {
89 PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
90 let checkpoint = checkpoint.unwrap_or_default();
91 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
92
93 info!(
94 pipeline_stages = %pipeline_stages_progress,
95 stage = %stage_id,
96 checkpoint = %checkpoint.block_number,
97 target = %OptionalField(target),
98 "Preparing stage",
99 );
100
101 self.current_stage = Some(current_stage);
102 }
103 PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
104 let checkpoint = checkpoint.unwrap_or_default();
105 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
106
107 if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
108 info!(
109 pipeline_stages = %pipeline_stages_progress,
110 stage = %stage_id,
111 checkpoint = %checkpoint.block_number,
112 target = %OptionalField(target),
113 %stage_eta,
114 "Executing stage",
115 );
116 } else {
117 info!(
118 pipeline_stages = %pipeline_stages_progress,
119 stage = %stage_id,
120 checkpoint = %checkpoint.block_number,
121 target = %OptionalField(target),
122 "Executing stage",
123 );
124 }
125
126 self.current_stage = Some(current_stage);
127 }
128 PipelineEvent::Ran {
129 pipeline_stages_progress,
130 stage_id,
131 result: ExecOutput { checkpoint, done },
132 } => {
133 if stage_id.is_finish() {
134 self.latest_block = Some(checkpoint.block_number);
135 }
136
137 if let Some(current_stage) = self.current_stage.as_mut() {
138 current_stage.checkpoint = checkpoint;
139 current_stage.entities_checkpoint = checkpoint.entities();
140 current_stage.eta.update(stage_id, checkpoint);
141
142 let target = OptionalField(current_stage.target);
143 let stage_progress = current_stage
144 .entities_checkpoint
145 .and_then(|entities| entities.fmt_percentage());
146 let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
147
148 let message = if done { "Finished stage" } else { "Committed stage progress" };
149
150 match (stage_progress, stage_eta) {
151 (Some(stage_progress), Some(stage_eta)) => {
152 info!(
153 pipeline_stages = %pipeline_stages_progress,
154 stage = %stage_id,
155 checkpoint = %checkpoint.block_number,
156 %target,
157 %stage_progress,
158 %stage_eta,
159 "{message}",
160 )
161 }
162 (Some(stage_progress), None) => {
163 info!(
164 pipeline_stages = %pipeline_stages_progress,
165 stage = %stage_id,
166 checkpoint = %checkpoint.block_number,
167 %target,
168 %stage_progress,
169 "{message}",
170 )
171 }
172 (None, Some(stage_eta)) => {
173 info!(
174 pipeline_stages = %pipeline_stages_progress,
175 stage = %stage_id,
176 checkpoint = %checkpoint.block_number,
177 %target,
178 %stage_eta,
179 "{message}",
180 )
181 }
182 (None, None) => {
183 info!(
184 pipeline_stages = %pipeline_stages_progress,
185 stage = %stage_id,
186 checkpoint = %checkpoint.block_number,
187 %target,
188 "{message}",
189 )
190 }
191 }
192 }
193
194 if done {
195 self.current_stage = None;
196 }
197 }
198 PipelineEvent::Unwind { stage_id, input } => {
199 let current_stage = CurrentStage {
200 stage_id,
201 eta: Eta::default(),
202 checkpoint: input.checkpoint,
203 target: Some(input.unwind_to),
204 entities_checkpoint: input.checkpoint.entities(),
205 };
206
207 self.current_stage = Some(current_stage);
208 }
209 PipelineEvent::Unwound { stage_id, result } => {
210 info!(stage = %stage_id, checkpoint = %result.checkpoint.block_number, "Unwound stage");
211 self.current_stage = None;
212 }
213 _ => (),
214 }
215 }
216
217 fn handle_consensus_engine_event<N: NodePrimitives>(&mut self, event: ConsensusEngineEvent<N>) {
218 match event {
219 ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
220 let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
221 state;
222 if self.safe_block_hash != Some(safe_block_hash) &&
223 self.finalized_block_hash != Some(finalized_block_hash)
224 {
225 let msg = match status {
226 ForkchoiceStatus::Valid => "Forkchoice updated",
227 ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
228 ForkchoiceStatus::Syncing => {
229 "Received forkchoice updated message when syncing"
230 }
231 };
232 info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
233 }
234 self.head_block_hash = Some(head_block_hash);
235 self.safe_block_hash = Some(safe_block_hash);
236 self.finalized_block_hash = Some(finalized_block_hash);
237 }
238 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
239 let block = executed.sealed_block();
240 let mut full = block.gas_used() as f64 * 100.0 / block.gas_limit() as f64;
241 if full.is_nan() {
242 full = 0.0;
243 }
244 info!(
245 number=block.number(),
246 hash=?block.hash(),
247 peers=self.num_connected_peers(),
248 txs=block.body().transactions().len(),
249 gas_used=%format_gas(block.gas_used()),
250 gas_throughput=%format_gas_throughput(block.gas_used(), elapsed),
251 gas_limit=%format_gas(block.gas_limit()),
252 full=%format!("{:.1}%", full),
253 base_fee=%format!("{:.2}Gwei", block.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
254 blobs=block.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
255 excess_blobs=block.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
256 ?elapsed,
257 "Block added to canonical chain"
258 );
259 }
260 ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
261 self.latest_block = Some(head.number());
262 info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
263 }
264 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
265 let block = executed.sealed_block();
266 info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
267 }
268 ConsensusEngineEvent::InvalidBlock(block) => {
269 warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
270 }
271 ConsensusEngineEvent::BlockReceived(num_hash) => {
272 info!(number=num_hash.number, hash=?num_hash.hash, "Received new payload from consensus engine");
273 }
274 }
275 }
276
277 fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
278 if self.current_stage.is_none() {
281 match event {
282 ConsensusLayerHealthEvent::NeverSeen => {
283 warn!(
284 "Post-merge network, but never seen beacon client. Please launch one to follow the chain!"
285 )
286 }
287 ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
288 warn!(
289 ?period,
290 "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!"
291 )
292 }
293 }
294 }
295 }
296
297 fn handle_pruner_event(&self, event: PrunerEvent) {
298 match event {
299 PrunerEvent::Started { tip_block_number } => {
300 debug!(tip_block_number, "Pruner started");
301 }
302 PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
303 let stats = format!(
304 "[{}]",
305 stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
306 );
307 debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
308 }
309 }
310 }
311
312 fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
313 match event {
314 StaticFileProducerEvent::Started { targets } => {
315 debug!(?targets, "Static File Producer started");
316 }
317 StaticFileProducerEvent::Finished { targets, elapsed } => {
318 debug!(?targets, ?elapsed, "Static File Producer finished");
319 }
320 }
321 }
322}
323
324struct OptionalField<T: Display>(Option<T>);
328
329impl<T: Display> Display for OptionalField<T> {
330 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
331 if let Some(field) = &self.0 {
332 write!(f, "{field}")
333 } else {
334 write!(f, "None")
335 }
336 }
337}
338
339struct CurrentStage {
341 stage_id: StageId,
342 eta: Eta,
343 checkpoint: StageCheckpoint,
344 entities_checkpoint: Option<EntitiesCheckpoint>,
348 target: Option<BlockNumber>,
349}
350
351#[derive(Debug, derive_more::From)]
353pub enum NodeEvent<N: NodePrimitives> {
354 Pipeline(PipelineEvent),
356 ConsensusEngine(ConsensusEngineEvent<N>),
358 ConsensusLayerHealth(ConsensusLayerHealthEvent),
360 Pruner(PrunerEvent),
362 StaticFileProducer(StaticFileProducerEvent),
364 Other(String),
367}
368
369pub async fn handle_events<E, N: NodePrimitives>(
372 peers_info: Option<Box<dyn PeersInfo>>,
373 latest_block_number: Option<BlockNumber>,
374 events: E,
375) where
376 E: Stream<Item = NodeEvent<N>> + Unpin,
377{
378 let state = NodeState::new(peers_info, latest_block_number);
379
380 let start = tokio::time::Instant::now() + Duration::from_secs(3);
381 let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
382 info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
383
384 let handler = EventHandler { state, events, info_interval };
385 handler.await
386}
387
388#[pin_project::pin_project]
390struct EventHandler<E> {
391 state: NodeState,
392 #[pin]
393 events: E,
394 #[pin]
395 info_interval: Interval,
396}
397
398impl<E, N: NodePrimitives> Future for EventHandler<E>
399where
400 E: Stream<Item = NodeEvent<N>> + Unpin,
401{
402 type Output = ();
403
404 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
405 let mut this = self.project();
406
407 while this.info_interval.poll_tick(cx).is_ready() {
408 if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
409 &this.state.current_stage
410 {
411 let stage_progress =
412 entities_checkpoint.and_then(|entities| entities.fmt_percentage());
413 let stage_eta = eta.fmt_for_stage(*stage_id);
414
415 match (stage_progress, stage_eta) {
416 (Some(stage_progress), Some(stage_eta)) => {
417 info!(
418 target: "reth::cli",
419 connected_peers = this.state.num_connected_peers(),
420 stage = %stage_id,
421 checkpoint = checkpoint.block_number,
422 target = %OptionalField(*target),
423 %stage_progress,
424 %stage_eta,
425 "Status"
426 )
427 }
428 (Some(stage_progress), None) => {
429 info!(
430 target: "reth::cli",
431 connected_peers = this.state.num_connected_peers(),
432 stage = %stage_id,
433 checkpoint = checkpoint.block_number,
434 target = %OptionalField(*target),
435 %stage_progress,
436 "Status"
437 )
438 }
439 (None, Some(stage_eta)) => {
440 info!(
441 target: "reth::cli",
442 connected_peers = this.state.num_connected_peers(),
443 stage = %stage_id,
444 checkpoint = checkpoint.block_number,
445 target = %OptionalField(*target),
446 %stage_eta,
447 "Status"
448 )
449 }
450 (None, None) => {
451 info!(
452 target: "reth::cli",
453 connected_peers = this.state.num_connected_peers(),
454 stage = %stage_id,
455 checkpoint = checkpoint.block_number,
456 target = %OptionalField(*target),
457 "Status"
458 )
459 }
460 }
461 } else {
462 let now =
463 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
464
465 if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
467 if let Some(latest_block) = this.state.latest_block {
468 info!(
469 target: "reth::cli",
470 connected_peers = this.state.num_connected_peers(),
471 %latest_block,
472 "Status"
473 );
474 } else {
475 info!(
476 target: "reth::cli",
477 connected_peers = this.state.num_connected_peers(),
478 "Status"
479 );
480 }
481 this.state.last_status_log_time = Some(now);
482 }
483 }
484 }
485
486 while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
487 match event {
488 NodeEvent::Pipeline(event) => {
489 this.state.handle_pipeline_event(event);
490 }
491 NodeEvent::ConsensusEngine(event) => {
492 this.state.handle_consensus_engine_event(event);
493 }
494 NodeEvent::ConsensusLayerHealth(event) => {
495 this.state.handle_consensus_layer_health_event(event)
496 }
497 NodeEvent::Pruner(event) => {
498 this.state.handle_pruner_event(event);
499 }
500 NodeEvent::StaticFileProducer(event) => {
501 this.state.handle_static_file_producer_event(event);
502 }
503 NodeEvent::Other(event_description) => {
504 warn!("{event_description}");
505 }
506 }
507 }
508
509 Poll::Pending
510 }
511}
512
513#[derive(Default, Copy, Clone)]
518struct Eta {
519 last_checkpoint: EntitiesCheckpoint,
521 last_checkpoint_time: Option<Instant>,
523 eta: Option<Duration>,
525}
526
527impl Eta {
528 fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
530 let Some(current) = checkpoint.entities() else { return };
531
532 if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
533 let Some(processed_since_last) =
534 current.processed.checked_sub(self.last_checkpoint.processed)
535 else {
536 self.eta = None;
537 debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
538 return
539 };
540 let elapsed = last_checkpoint_time.elapsed();
541 let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
542
543 let Some(remaining) = current.total.checked_sub(current.processed) else {
544 self.eta = None;
545 debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
546 return
547 };
548
549 self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
550 }
551
552 self.last_checkpoint = current;
553 self.last_checkpoint_time = Some(Instant::now());
554 }
555
556 fn is_available(&self) -> bool {
558 self.eta.zip(self.last_checkpoint_time).is_some()
559 }
560
561 fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
567 if !self.is_available() ||
568 matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
569 {
570 None
571 } else {
572 Some(self.to_string())
573 }
574 }
575}
576
577impl Display for Eta {
578 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579 if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
580 let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
581
582 if let Some(remaining) = remaining {
583 return write!(
584 f,
585 "{}",
586 humantime::format_duration(Duration::from_secs(remaining.as_secs()))
587 .to_string()
588 .replace(' ', "")
589 )
590 }
591 }
592
593 write!(f, "unknown")
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600
601 #[test]
602 fn eta_display_no_milliseconds() {
603 let eta = Eta {
604 last_checkpoint_time: Some(Instant::now()),
605 eta: Some(Duration::from_millis(
606 13 * 60 * 1000 + 37 * 1000 + 999, )),
610 ..Default::default()
611 }
612 .to_string();
613
614 assert_eq!(eta, "13m37s");
615 }
616}