1use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader};
5use alloy_primitives::{BlockNumber, B256};
6use alloy_rpc_types_engine::ForkchoiceState;
7use futures::Stream;
8use reth_engine_primitives::{
9 ConsensusEngineEvent, ConsensusEngineLiveSyncProgress, ForkchoiceStatus,
10};
11use reth_network_api::PeersInfo;
12use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
13use reth_prune_types::PrunerEvent;
14use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
15use reth_static_file_types::StaticFileProducerEvent;
16use std::{
17 fmt::{Display, Formatter},
18 future::Future,
19 pin::Pin,
20 task::{Context, Poll},
21 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
22};
23use tokio::time::Interval;
24use tracing::{debug, info, warn};
25
26const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
28
29struct NodeState {
34 peers_info: Option<Box<dyn PeersInfo>>,
36 current_stage: Option<CurrentStage>,
38 latest_block: Option<BlockNumber>,
40 head_block_hash: Option<B256>,
42 safe_block_hash: Option<B256>,
44 finalized_block_hash: Option<B256>,
46 last_status_log_time: Option<u64>,
48}
49
50impl NodeState {
51 const fn new(
52 peers_info: Option<Box<dyn PeersInfo>>,
53 latest_block: Option<BlockNumber>,
54 ) -> Self {
55 Self {
56 peers_info,
57 current_stage: None,
58 latest_block,
59 head_block_hash: None,
60 safe_block_hash: None,
61 finalized_block_hash: None,
62 last_status_log_time: None,
63 }
64 }
65
66 fn num_connected_peers(&self) -> usize {
67 self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
68 }
69
70 fn build_current_stage(
71 &self,
72 stage_id: StageId,
73 checkpoint: StageCheckpoint,
74 target: Option<BlockNumber>,
75 ) -> CurrentStage {
76 let (eta, entities_checkpoint) = self
77 .current_stage
78 .as_ref()
79 .filter(|current_stage| current_stage.stage_id == stage_id)
80 .map_or_else(
81 || (Eta::default(), None),
82 |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
83 );
84
85 CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
86 }
87
88 fn handle_pipeline_event(&mut self, event: PipelineEvent) {
90 match event {
91 PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
92 let checkpoint = checkpoint.unwrap_or_default();
93 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
94
95 info!(
96 pipeline_stages = %pipeline_stages_progress,
97 stage = %stage_id,
98 checkpoint = %checkpoint.block_number,
99 target = %OptionalField(target),
100 "Preparing stage",
101 );
102
103 self.current_stage = Some(current_stage);
104 }
105 PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
106 let checkpoint = checkpoint.unwrap_or_default();
107 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
108
109 if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
110 info!(
111 pipeline_stages = %pipeline_stages_progress,
112 stage = %stage_id,
113 checkpoint = %checkpoint.block_number,
114 target = %OptionalField(target),
115 %stage_eta,
116 "Executing stage",
117 );
118 } else {
119 info!(
120 pipeline_stages = %pipeline_stages_progress,
121 stage = %stage_id,
122 checkpoint = %checkpoint.block_number,
123 target = %OptionalField(target),
124 "Executing stage",
125 );
126 }
127
128 self.current_stage = Some(current_stage);
129 }
130 PipelineEvent::Ran {
131 pipeline_stages_progress,
132 stage_id,
133 result: ExecOutput { checkpoint, done },
134 } => {
135 if stage_id.is_finish() {
136 self.latest_block = Some(checkpoint.block_number);
137 }
138
139 if let Some(current_stage) = self.current_stage.as_mut() {
140 current_stage.checkpoint = checkpoint;
141 current_stage.entities_checkpoint = checkpoint.entities();
142 current_stage.eta.update(stage_id, checkpoint);
143
144 let target = OptionalField(current_stage.target);
145 let stage_progress = current_stage
146 .entities_checkpoint
147 .and_then(|entities| entities.fmt_percentage());
148 let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
149
150 let message = if done { "Finished stage" } else { "Committed stage progress" };
151
152 match (stage_progress, stage_eta) {
153 (Some(stage_progress), Some(stage_eta)) => {
154 info!(
155 pipeline_stages = %pipeline_stages_progress,
156 stage = %stage_id,
157 checkpoint = %checkpoint.block_number,
158 %target,
159 %stage_progress,
160 %stage_eta,
161 "{message}",
162 )
163 }
164 (Some(stage_progress), None) => {
165 info!(
166 pipeline_stages = %pipeline_stages_progress,
167 stage = %stage_id,
168 checkpoint = %checkpoint.block_number,
169 %target,
170 %stage_progress,
171 "{message}",
172 )
173 }
174 (None, Some(stage_eta)) => {
175 info!(
176 pipeline_stages = %pipeline_stages_progress,
177 stage = %stage_id,
178 checkpoint = %checkpoint.block_number,
179 %target,
180 %stage_eta,
181 "{message}",
182 )
183 }
184 (None, None) => {
185 info!(
186 pipeline_stages = %pipeline_stages_progress,
187 stage = %stage_id,
188 checkpoint = %checkpoint.block_number,
189 %target,
190 "{message}",
191 )
192 }
193 }
194 }
195
196 if done {
197 self.current_stage = None;
198 }
199 }
200 PipelineEvent::Unwind { stage_id, input } => {
201 let current_stage = CurrentStage {
202 stage_id,
203 eta: Eta::default(),
204 checkpoint: input.checkpoint,
205 target: Some(input.unwind_to),
206 entities_checkpoint: input.checkpoint.entities(),
207 };
208
209 self.current_stage = Some(current_stage);
210 }
211 _ => (),
212 }
213 }
214
215 fn handle_consensus_engine_event<N: NodePrimitives>(&mut self, event: ConsensusEngineEvent<N>) {
216 match event {
217 ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
218 let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
219 state;
220 if self.safe_block_hash != Some(safe_block_hash) &&
221 self.finalized_block_hash != Some(finalized_block_hash)
222 {
223 let msg = match status {
224 ForkchoiceStatus::Valid => "Forkchoice updated",
225 ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
226 ForkchoiceStatus::Syncing => {
227 "Received forkchoice updated message when syncing"
228 }
229 };
230 info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
231 }
232 self.head_block_hash = Some(head_block_hash);
233 self.safe_block_hash = Some(safe_block_hash);
234 self.finalized_block_hash = Some(finalized_block_hash);
235 }
236 ConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
237 match live_sync_progress {
238 ConsensusEngineLiveSyncProgress::DownloadingBlocks {
239 remaining_blocks,
240 target,
241 } => {
242 info!(
243 remaining_blocks,
244 target_block_hash=?target,
245 "Live sync in progress, downloading blocks"
246 );
247 }
248 }
249 }
250 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
251 let block = executed.sealed_block();
252 let mut full = block.gas_used() as f64 * 100.0 / block.gas_limit() as f64;
253 if full.is_nan() {
254 full = 0.0;
255 }
256 info!(
257 number=block.number(),
258 hash=?block.hash(),
259 peers=self.num_connected_peers(),
260 txs=block.body().transactions().len(),
261 gas_used=%format_gas(block.gas_used()),
262 gas_throughput=%format_gas_throughput(block.gas_used(), elapsed),
263 gas_limit=%format_gas(block.gas_limit()),
264 full=%format!("{:.1}%", full),
265 base_fee=%format!("{:.2}Gwei", block.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
266 blobs=block.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
267 excess_blobs=block.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
268 ?elapsed,
269 "Block added to canonical chain"
270 );
271 }
272 ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
273 self.latest_block = Some(head.number());
274 info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
275 }
276 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
277 let block = executed.sealed_block();
278 info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
279 }
280 ConsensusEngineEvent::InvalidBlock(block) => {
281 warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
282 }
283 ConsensusEngineEvent::BlockReceived(num_hash) => {
284 info!(number=num_hash.number, hash=?num_hash.hash, "Received block from consensus engine");
285 }
286 }
287 }
288
289 fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
290 if self.current_stage.is_none() {
293 match event {
294 ConsensusLayerHealthEvent::NeverSeen => {
295 warn!(
296 "Post-merge network, but never seen beacon client. Please launch one to follow the chain!"
297 )
298 }
299 ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
300 warn!(
301 ?period,
302 "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!"
303 )
304 }
305 ConsensusLayerHealthEvent::NeverReceivedUpdates => {
306 warn!(
307 "Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!"
308 )
309 }
310 ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
311 warn!(
312 ?period,
313 "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!"
314 )
315 }
316 }
317 }
318 }
319
320 fn handle_pruner_event(&self, event: PrunerEvent) {
321 match event {
322 PrunerEvent::Started { tip_block_number } => {
323 debug!(tip_block_number, "Pruner started");
324 }
325 PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
326 let stats = format!(
327 "[{}]",
328 stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
329 );
330 debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
331 }
332 }
333 }
334
335 fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
336 match event {
337 StaticFileProducerEvent::Started { targets } => {
338 debug!(?targets, "Static File Producer started");
339 }
340 StaticFileProducerEvent::Finished { targets, elapsed } => {
341 debug!(?targets, ?elapsed, "Static File Producer finished");
342 }
343 }
344 }
345}
346
347struct OptionalField<T: Display>(Option<T>);
351
352impl<T: Display> Display for OptionalField<T> {
353 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
354 if let Some(field) = &self.0 {
355 write!(f, "{field}")
356 } else {
357 write!(f, "None")
358 }
359 }
360}
361
362struct CurrentStage {
364 stage_id: StageId,
365 eta: Eta,
366 checkpoint: StageCheckpoint,
367 entities_checkpoint: Option<EntitiesCheckpoint>,
371 target: Option<BlockNumber>,
372}
373
374#[derive(Debug, derive_more::From)]
376pub enum NodeEvent<N: NodePrimitives> {
377 Pipeline(PipelineEvent),
379 ConsensusEngine(ConsensusEngineEvent<N>),
381 ConsensusLayerHealth(ConsensusLayerHealthEvent),
383 Pruner(PrunerEvent),
385 StaticFileProducer(StaticFileProducerEvent),
387 Other(String),
390}
391
392pub async fn handle_events<E, N: NodePrimitives>(
395 peers_info: Option<Box<dyn PeersInfo>>,
396 latest_block_number: Option<BlockNumber>,
397 events: E,
398) where
399 E: Stream<Item = NodeEvent<N>> + Unpin,
400{
401 let state = NodeState::new(peers_info, latest_block_number);
402
403 let start = tokio::time::Instant::now() + Duration::from_secs(3);
404 let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
405 info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
406
407 let handler = EventHandler { state, events, info_interval };
408 handler.await
409}
410
411#[pin_project::pin_project]
413struct EventHandler<E> {
414 state: NodeState,
415 #[pin]
416 events: E,
417 #[pin]
418 info_interval: Interval,
419}
420
421impl<E, N: NodePrimitives> Future for EventHandler<E>
422where
423 E: Stream<Item = NodeEvent<N>> + Unpin,
424{
425 type Output = ();
426
427 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428 let mut this = self.project();
429
430 while this.info_interval.poll_tick(cx).is_ready() {
431 if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
432 &this.state.current_stage
433 {
434 let stage_progress =
435 entities_checkpoint.and_then(|entities| entities.fmt_percentage());
436 let stage_eta = eta.fmt_for_stage(*stage_id);
437
438 match (stage_progress, stage_eta) {
439 (Some(stage_progress), Some(stage_eta)) => {
440 info!(
441 target: "reth::cli",
442 connected_peers = this.state.num_connected_peers(),
443 stage = %stage_id,
444 checkpoint = checkpoint.block_number,
445 target = %OptionalField(*target),
446 %stage_progress,
447 %stage_eta,
448 "Status"
449 )
450 }
451 (Some(stage_progress), None) => {
452 info!(
453 target: "reth::cli",
454 connected_peers = this.state.num_connected_peers(),
455 stage = %stage_id,
456 checkpoint = checkpoint.block_number,
457 target = %OptionalField(*target),
458 %stage_progress,
459 "Status"
460 )
461 }
462 (None, Some(stage_eta)) => {
463 info!(
464 target: "reth::cli",
465 connected_peers = this.state.num_connected_peers(),
466 stage = %stage_id,
467 checkpoint = checkpoint.block_number,
468 target = %OptionalField(*target),
469 %stage_eta,
470 "Status"
471 )
472 }
473 (None, None) => {
474 info!(
475 target: "reth::cli",
476 connected_peers = this.state.num_connected_peers(),
477 stage = %stage_id,
478 checkpoint = checkpoint.block_number,
479 target = %OptionalField(*target),
480 "Status"
481 )
482 }
483 }
484 } else {
485 let now =
486 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
487
488 if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
490 if let Some(latest_block) = this.state.latest_block {
491 info!(
492 target: "reth::cli",
493 connected_peers = this.state.num_connected_peers(),
494 %latest_block,
495 "Status"
496 );
497 } else {
498 info!(
499 target: "reth::cli",
500 connected_peers = this.state.num_connected_peers(),
501 "Status"
502 );
503 }
504 this.state.last_status_log_time = Some(now);
505 }
506 }
507 }
508
509 while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
510 match event {
511 NodeEvent::Pipeline(event) => {
512 this.state.handle_pipeline_event(event);
513 }
514 NodeEvent::ConsensusEngine(event) => {
515 this.state.handle_consensus_engine_event(event);
516 }
517 NodeEvent::ConsensusLayerHealth(event) => {
518 this.state.handle_consensus_layer_health_event(event)
519 }
520 NodeEvent::Pruner(event) => {
521 this.state.handle_pruner_event(event);
522 }
523 NodeEvent::StaticFileProducer(event) => {
524 this.state.handle_static_file_producer_event(event);
525 }
526 NodeEvent::Other(event_description) => {
527 warn!("{event_description}");
528 }
529 }
530 }
531
532 Poll::Pending
533 }
534}
535
536#[derive(Default, Copy, Clone)]
541struct Eta {
542 last_checkpoint: EntitiesCheckpoint,
544 last_checkpoint_time: Option<Instant>,
546 eta: Option<Duration>,
548}
549
550impl Eta {
551 fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
553 let Some(current) = checkpoint.entities() else { return };
554
555 if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
556 let Some(processed_since_last) =
557 current.processed.checked_sub(self.last_checkpoint.processed)
558 else {
559 self.eta = None;
560 debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
561 return
562 };
563 let elapsed = last_checkpoint_time.elapsed();
564 let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
565
566 let Some(remaining) = current.total.checked_sub(current.processed) else {
567 self.eta = None;
568 debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
569 return
570 };
571
572 self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
573 }
574
575 self.last_checkpoint = current;
576 self.last_checkpoint_time = Some(Instant::now());
577 }
578
579 fn is_available(&self) -> bool {
581 self.eta.zip(self.last_checkpoint_time).is_some()
582 }
583
584 fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
590 if !self.is_available() ||
591 matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
592 {
593 None
594 } else {
595 Some(self.to_string())
596 }
597 }
598}
599
600impl Display for Eta {
601 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
602 if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
603 let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
604
605 if let Some(remaining) = remaining {
606 return write!(
607 f,
608 "{}",
609 humantime::format_duration(Duration::from_secs(remaining.as_secs()))
610 )
611 }
612 }
613
614 write!(f, "unknown")
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621
622 #[test]
623 fn eta_display_no_milliseconds() {
624 let eta = Eta {
625 last_checkpoint_time: Some(Instant::now()),
626 eta: Some(Duration::from_millis(
627 13 * 60 * 1000 + 37 * 1000 + 999, )),
631 ..Default::default()
632 }
633 .to_string();
634
635 assert_eq!(eta, "13m 37s");
636 }
637}