1use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::{
5 constants::{GWEI_TO_WEI, MGAS_TO_GAS},
6 BlockHeader,
7};
8use alloy_primitives::{BlockNumber, B256};
9use alloy_rpc_types_engine::ForkchoiceState;
10use futures::Stream;
11use reth_engine_primitives::{ConsensusEngineEvent, ForkchoiceStatus, SlowBlockInfo};
12use reth_network_api::PeersInfo;
13use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
14use reth_prune_types::PrunerEvent;
15use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
16use reth_static_file_types::StaticFileProducerEvent;
17use std::{
18 fmt::{Display, Formatter},
19 future::Future,
20 pin::Pin,
21 task::{Context, Poll},
22 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
23};
24use tokio::time::Interval;
25use tracing::{debug, info, warn};
26
27const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
29
30struct NodeState {
35 peers_info: Option<Box<dyn PeersInfo>>,
37 current_stage: Option<CurrentStage>,
39 latest_block: Option<BlockNumber>,
41 head_block_hash: Option<B256>,
43 safe_block_hash: Option<B256>,
45 finalized_block_hash: Option<B256>,
47 last_status_log_time: Option<u64>,
49}
50
51impl NodeState {
52 const fn new(
53 peers_info: Option<Box<dyn PeersInfo>>,
54 latest_block: Option<BlockNumber>,
55 ) -> Self {
56 Self {
57 peers_info,
58 current_stage: None,
59 latest_block,
60 head_block_hash: None,
61 safe_block_hash: None,
62 finalized_block_hash: None,
63 last_status_log_time: None,
64 }
65 }
66
67 fn num_connected_peers(&self) -> usize {
68 self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
69 }
70
71 fn build_current_stage(
72 &self,
73 stage_id: StageId,
74 checkpoint: StageCheckpoint,
75 target: Option<BlockNumber>,
76 ) -> CurrentStage {
77 let (eta, entities_checkpoint) = self
78 .current_stage
79 .as_ref()
80 .filter(|current_stage| current_stage.stage_id == stage_id)
81 .map_or_else(
82 || (Eta::default(), None),
83 |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
84 );
85
86 CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
87 }
88
89 fn handle_pipeline_event(&mut self, event: PipelineEvent) {
91 match event {
92 PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
93 let checkpoint = checkpoint.unwrap_or_default();
94 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
95
96 info!(
97 pipeline_stages = %pipeline_stages_progress,
98 stage = %stage_id,
99 checkpoint = %checkpoint.block_number,
100 target = %OptionalField(target),
101 "Preparing stage",
102 );
103
104 self.current_stage = Some(current_stage);
105 }
106 PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
107 let checkpoint = checkpoint.unwrap_or_default();
108 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
109
110 if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
111 info!(
112 pipeline_stages = %pipeline_stages_progress,
113 stage = %stage_id,
114 checkpoint = %checkpoint.block_number,
115 target = %OptionalField(target),
116 %stage_eta,
117 "Executing stage",
118 );
119 } else {
120 info!(
121 pipeline_stages = %pipeline_stages_progress,
122 stage = %stage_id,
123 checkpoint = %checkpoint.block_number,
124 target = %OptionalField(target),
125 "Executing stage",
126 );
127 }
128
129 self.current_stage = Some(current_stage);
130 }
131 PipelineEvent::Ran {
132 pipeline_stages_progress,
133 stage_id,
134 result: ExecOutput { checkpoint, done },
135 } => {
136 if stage_id.is_finish() {
137 self.latest_block = Some(checkpoint.block_number);
138 }
139
140 if let Some(current_stage) = self.current_stage.as_mut() {
141 current_stage.checkpoint = checkpoint;
142 current_stage.entities_checkpoint = checkpoint.entities();
143 current_stage.eta.update(stage_id, checkpoint);
144
145 let target = OptionalField(current_stage.target);
146 let stage_progress = current_stage
147 .entities_checkpoint
148 .and_then(|entities| entities.fmt_percentage());
149 let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
150
151 let message = if done { "Finished stage" } else { "Committed stage progress" };
152
153 match (stage_progress, stage_eta) {
154 (Some(stage_progress), Some(stage_eta)) => {
155 info!(
156 pipeline_stages = %pipeline_stages_progress,
157 stage = %stage_id,
158 checkpoint = %checkpoint.block_number,
159 %target,
160 %stage_progress,
161 %stage_eta,
162 "{message}",
163 )
164 }
165 (Some(stage_progress), None) => {
166 info!(
167 pipeline_stages = %pipeline_stages_progress,
168 stage = %stage_id,
169 checkpoint = %checkpoint.block_number,
170 %target,
171 %stage_progress,
172 "{message}",
173 )
174 }
175 (None, Some(stage_eta)) => {
176 info!(
177 pipeline_stages = %pipeline_stages_progress,
178 stage = %stage_id,
179 checkpoint = %checkpoint.block_number,
180 %target,
181 %stage_eta,
182 "{message}",
183 )
184 }
185 (None, None) => {
186 info!(
187 pipeline_stages = %pipeline_stages_progress,
188 stage = %stage_id,
189 checkpoint = %checkpoint.block_number,
190 %target,
191 "{message}",
192 )
193 }
194 }
195 }
196
197 if done {
198 self.current_stage = None;
199 }
200 }
201 PipelineEvent::Unwind { stage_id, input } => {
202 let current_stage = CurrentStage {
203 stage_id,
204 eta: Eta::default(),
205 checkpoint: input.checkpoint,
206 target: Some(input.unwind_to),
207 entities_checkpoint: input.checkpoint.entities(),
208 };
209
210 self.current_stage = Some(current_stage);
211 }
212 PipelineEvent::Unwound { stage_id, result } => {
213 info!(stage = %stage_id, checkpoint = %result.checkpoint.block_number, "Unwound stage");
214 self.current_stage = None;
215 }
216 _ => (),
217 }
218 }
219
220 fn handle_consensus_engine_event<N: NodePrimitives>(&mut self, event: ConsensusEngineEvent<N>) {
221 match event {
222 ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
223 let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
224 state;
225 if self.safe_block_hash != Some(safe_block_hash) &&
226 self.finalized_block_hash != Some(finalized_block_hash)
227 {
228 let msg = match status {
229 ForkchoiceStatus::Valid => "Forkchoice updated",
230 ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
231 ForkchoiceStatus::Syncing => {
232 "Received forkchoice updated message when syncing"
233 }
234 };
235 info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
236 }
237 self.head_block_hash = Some(head_block_hash);
238 self.safe_block_hash = Some(safe_block_hash);
239 self.finalized_block_hash = Some(finalized_block_hash);
240 }
241 ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
242 let block = executed.sealed_block();
243 let mut full = block.gas_used() as f64 * 100.0 / block.gas_limit() as f64;
244 if full.is_nan() {
245 full = 0.0;
246 }
247 info!(
248 number=block.number(),
249 hash=?block.hash(),
250 peers=self.num_connected_peers(),
251 txs=block.body().transactions().len(),
252 gas_used=%format_gas(block.gas_used()),
253 gas_throughput=%format_gas_throughput(block.gas_used(), elapsed),
254 gas_limit=%format_gas(block.gas_limit()),
255 full=%format!("{:.1}%", full),
256 base_fee=%format!("{:.2}Gwei", block.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
257 blobs=block.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
258 excess_blobs=block.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
259 ?elapsed,
260 "Block added to canonical chain"
261 );
262 }
263 ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
264 self.latest_block = Some(head.number());
265 info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
266 }
267 ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
268 let block = executed.sealed_block();
269 info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
270 }
271 ConsensusEngineEvent::InvalidBlock(block) => {
272 warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
273 }
274 ConsensusEngineEvent::BlockReceived(num_hash) => {
275 info!(number=num_hash.number, hash=?num_hash.hash, "Received new payload from consensus engine");
276 }
277 ConsensusEngineEvent::SlowBlock(info) => {
278 Self::log_slow_block(&info);
279 }
280 }
281 }
282
283 fn log_slow_block(info: &SlowBlockInfo) {
284 fn hit_rate(hits: usize, misses: usize) -> f64 {
285 let total = hits + misses;
286 if total > 0 {
287 (hits as f64 / total as f64) * 100.0
288 } else {
289 0.0
290 }
291 }
292
293 let stats = &info.stats;
294 let processing_secs =
295 stats.execution_duration.as_secs_f64() + stats.state_hash_duration.as_secs_f64();
296 let mgas_per_sec = if processing_secs > 0.0 {
297 (stats.gas_used as f64 / MGAS_TO_GAS as f64) / processing_secs
298 } else {
299 0.0
300 };
301
302 macro_rules! log_slow_block_fields {
305 ($($commit_field:tt)*) => {
306 warn!(
307 target: "reth::slow_block",
308 message = "Slow block",
309 block.number = stats.block_number,
310 block.hash = ?stats.block_hash,
311 block.gas_used = stats.gas_used,
312 block.tx_count = stats.tx_count,
313 timing.execution_ms = stats.execution_duration.as_millis(),
314 timing.state_read_ms = stats.state_read_duration.as_millis(),
315 timing.state_hash_ms = stats.state_hash_duration.as_millis(),
316 $($commit_field)*
317 timing.total_ms = info.total_duration.as_millis(),
318 throughput.mgas_per_sec = format!("{:.2}", mgas_per_sec),
319 state_reads.accounts = stats.accounts_read,
320 state_reads.storage_slots = stats.storage_read,
321 state_reads.code = stats.code_read,
322 state_reads.code_bytes = stats.code_bytes_read,
323 state_writes.accounts = stats.accounts_changed,
324 state_writes.accounts_deleted = stats.accounts_deleted,
325 state_writes.storage_slots = stats.storage_slots_changed,
326 state_writes.storage_slots_deleted = stats.storage_slots_deleted,
327 state_writes.code = stats.bytecodes_changed,
328 state_writes.code_bytes = stats.code_bytes_written,
329 state_writes.eip7702_delegations_set = stats.eip7702_delegations_set,
330 state_writes.eip7702_delegations_cleared = stats.eip7702_delegations_cleared,
331 cache.account.hits = stats.account_cache_hits,
332 cache.account.misses = stats.account_cache_misses,
333 cache.account.hit_rate = format!("{:.2}", hit_rate(stats.account_cache_hits, stats.account_cache_misses)),
334 cache.storage.hits = stats.storage_cache_hits,
335 cache.storage.misses = stats.storage_cache_misses,
336 cache.storage.hit_rate = format!("{:.2}", hit_rate(stats.storage_cache_hits, stats.storage_cache_misses)),
337 cache.code.hits = stats.code_cache_hits,
338 cache.code.misses = stats.code_cache_misses,
339 cache.code.hit_rate = format!("{:.2}", hit_rate(stats.code_cache_hits, stats.code_cache_misses)),
340 );
341 }
342 }
343
344 if let Some(commit_dur) = info.commit_duration {
345 log_slow_block_fields!(timing.commit_ms = commit_dur.as_millis(),);
346 } else {
347 log_slow_block_fields!();
348 }
349 }
350
351 fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
352 if self.current_stage.is_none() {
355 match event {
356 ConsensusLayerHealthEvent::NeverSeen => {
357 warn!(
358 "Post-merge network, but never seen beacon client. Please launch one to follow the chain!"
359 )
360 }
361 ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
362 warn!(
363 ?period,
364 "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!"
365 )
366 }
367 }
368 }
369 }
370
371 fn handle_pruner_event(&self, event: PrunerEvent) {
372 match event {
373 PrunerEvent::Started { tip_block_number } => {
374 debug!(tip_block_number, "Pruner started");
375 }
376 PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
377 let stats = format!(
378 "[{}]",
379 stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
380 );
381 debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
382 }
383 }
384 }
385
386 fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
387 match event {
388 StaticFileProducerEvent::Started { targets } => {
389 debug!(?targets, "Static File Producer started");
390 }
391 StaticFileProducerEvent::Finished { targets, elapsed } => {
392 debug!(?targets, ?elapsed, "Static File Producer finished");
393 }
394 }
395 }
396}
397
398struct OptionalField<T: Display>(Option<T>);
402
403impl<T: Display> Display for OptionalField<T> {
404 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405 if let Some(field) = &self.0 {
406 write!(f, "{field}")
407 } else {
408 write!(f, "None")
409 }
410 }
411}
412
413struct CurrentStage {
415 stage_id: StageId,
416 eta: Eta,
417 checkpoint: StageCheckpoint,
418 entities_checkpoint: Option<EntitiesCheckpoint>,
422 target: Option<BlockNumber>,
423}
424
425#[derive(Debug, derive_more::From)]
427pub enum NodeEvent<N: NodePrimitives> {
428 Pipeline(PipelineEvent),
430 ConsensusEngine(ConsensusEngineEvent<N>),
432 ConsensusLayerHealth(ConsensusLayerHealthEvent),
434 Pruner(PrunerEvent),
436 StaticFileProducer(StaticFileProducerEvent),
438 Other(String),
441}
442
443pub async fn handle_events<E, N: NodePrimitives>(
446 peers_info: Option<Box<dyn PeersInfo>>,
447 latest_block_number: Option<BlockNumber>,
448 events: E,
449) where
450 E: Stream<Item = NodeEvent<N>> + Unpin,
451{
452 let state = NodeState::new(peers_info, latest_block_number);
453
454 let start = tokio::time::Instant::now() + Duration::from_secs(3);
455 let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
456 info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
457
458 let handler = EventHandler { state, events, info_interval };
459 handler.await
460}
461
462#[pin_project::pin_project]
464struct EventHandler<E> {
465 state: NodeState,
466 #[pin]
467 events: E,
468 #[pin]
469 info_interval: Interval,
470}
471
472impl<E, N: NodePrimitives> Future for EventHandler<E>
473where
474 E: Stream<Item = NodeEvent<N>> + Unpin,
475{
476 type Output = ();
477
478 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
479 let mut this = self.project();
480
481 while this.info_interval.poll_tick(cx).is_ready() {
482 if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
483 &this.state.current_stage
484 {
485 let stage_progress =
486 entities_checkpoint.and_then(|entities| entities.fmt_percentage());
487 let stage_eta = eta.fmt_for_stage(*stage_id);
488
489 match (stage_progress, stage_eta) {
490 (Some(stage_progress), Some(stage_eta)) => {
491 info!(
492 target: "reth::cli",
493 connected_peers = this.state.num_connected_peers(),
494 stage = %stage_id,
495 checkpoint = checkpoint.block_number,
496 target = %OptionalField(*target),
497 %stage_progress,
498 %stage_eta,
499 "Status"
500 )
501 }
502 (Some(stage_progress), None) => {
503 info!(
504 target: "reth::cli",
505 connected_peers = this.state.num_connected_peers(),
506 stage = %stage_id,
507 checkpoint = checkpoint.block_number,
508 target = %OptionalField(*target),
509 %stage_progress,
510 "Status"
511 )
512 }
513 (None, Some(stage_eta)) => {
514 info!(
515 target: "reth::cli",
516 connected_peers = this.state.num_connected_peers(),
517 stage = %stage_id,
518 checkpoint = checkpoint.block_number,
519 target = %OptionalField(*target),
520 %stage_eta,
521 "Status"
522 )
523 }
524 (None, None) => {
525 info!(
526 target: "reth::cli",
527 connected_peers = this.state.num_connected_peers(),
528 stage = %stage_id,
529 checkpoint = checkpoint.block_number,
530 target = %OptionalField(*target),
531 "Status"
532 )
533 }
534 }
535 } else {
536 let now =
537 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
538
539 if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
541 if let Some(latest_block) = this.state.latest_block {
542 info!(
543 target: "reth::cli",
544 connected_peers = this.state.num_connected_peers(),
545 %latest_block,
546 "Status"
547 );
548 } else {
549 info!(
550 target: "reth::cli",
551 connected_peers = this.state.num_connected_peers(),
552 "Status"
553 );
554 }
555 this.state.last_status_log_time = Some(now);
556 }
557 }
558 }
559
560 while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
561 match event {
562 NodeEvent::Pipeline(event) => {
563 this.state.handle_pipeline_event(event);
564 }
565 NodeEvent::ConsensusEngine(event) => {
566 this.state.handle_consensus_engine_event(event);
567 }
568 NodeEvent::ConsensusLayerHealth(event) => {
569 this.state.handle_consensus_layer_health_event(event)
570 }
571 NodeEvent::Pruner(event) => {
572 this.state.handle_pruner_event(event);
573 }
574 NodeEvent::StaticFileProducer(event) => {
575 this.state.handle_static_file_producer_event(event);
576 }
577 NodeEvent::Other(event_description) => {
578 warn!("{event_description}");
579 }
580 }
581 }
582
583 Poll::Pending
584 }
585}
586
587#[derive(Default, Copy, Clone)]
592struct Eta {
593 last_checkpoint: EntitiesCheckpoint,
595 last_checkpoint_time: Option<Instant>,
597 eta: Option<Duration>,
599}
600
601impl Eta {
602 fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
604 let Some(current) = checkpoint.entities() else { return };
605
606 if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
607 let Some(processed_since_last) =
608 current.processed.checked_sub(self.last_checkpoint.processed)
609 else {
610 self.eta = None;
611 debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
612 return
613 };
614 let elapsed = last_checkpoint_time.elapsed();
615 let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
616
617 let Some(remaining) = current.total.checked_sub(current.processed) else {
618 self.eta = None;
619 debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
620 return
621 };
622
623 self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
624 }
625
626 self.last_checkpoint = current;
627 self.last_checkpoint_time = Some(Instant::now());
628 }
629
630 fn is_available(&self) -> bool {
632 self.eta.zip(self.last_checkpoint_time).is_some()
633 }
634
635 fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
641 if !self.is_available() ||
642 matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
643 {
644 None
645 } else {
646 Some(self.to_string())
647 }
648 }
649}
650
651impl Display for Eta {
652 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653 if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
654 let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
655
656 if let Some(remaining) = remaining {
657 return write!(
658 f,
659 "{}",
660 humantime::format_duration(Duration::from_secs(remaining.as_secs()))
661 .to_string()
662 .replace(' ', "")
663 )
664 }
665 }
666
667 write!(f, "unknown")
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674
675 #[test]
676 fn eta_display_no_milliseconds() {
677 let eta = Eta {
678 last_checkpoint_time: Some(Instant::now()),
679 eta: Some(Duration::from_millis(
680 13 * 60 * 1000 + 37 * 1000 + 999, )),
684 ..Default::default()
685 }
686 .to_string();
687
688 assert_eq!(eta, "13m37s");
689 }
690}