reth_node_events/
node.rs

1//! Support for handling events emitted by node components.
2
3use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader};
5use alloy_primitives::{BlockNumber, B256};
6use alloy_rpc_types_engine::ForkchoiceState;
7use futures::Stream;
8use reth_engine_primitives::{
9    ConsensusEngineEvent, ConsensusEngineLiveSyncProgress, ForkchoiceStatus,
10};
11use reth_network_api::PeersInfo;
12use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
13use reth_prune_types::PrunerEvent;
14use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
15use reth_static_file_types::StaticFileProducerEvent;
16use std::{
17    fmt::{Display, Formatter},
18    future::Future,
19    pin::Pin,
20    task::{Context, Poll},
21    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
22};
23use tokio::time::Interval;
24use tracing::{debug, info, warn};
25
26/// Interval of reporting node state.
27const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
28
29/// The current high-level state of the node, including the node's database environment, network
30/// connections, current processing stage, and the latest block information. It provides
31/// methods to handle different types of events that affect the node's state, such as pipeline
32/// events, network events, and consensus engine events.
33struct NodeState {
34    /// Information about connected peers.
35    peers_info: Option<Box<dyn PeersInfo>>,
36    /// The stage currently being executed.
37    current_stage: Option<CurrentStage>,
38    /// The latest block reached by either pipeline or consensus engine.
39    latest_block: Option<BlockNumber>,
40    /// Hash of the head block last set by fork choice update
41    head_block_hash: Option<B256>,
42    /// Hash of the safe block last set by fork choice update
43    safe_block_hash: Option<B256>,
44    /// Hash of finalized block last set by fork choice update
45    finalized_block_hash: Option<B256>,
46    /// The time when we last logged a status message
47    last_status_log_time: Option<u64>,
48}
49
50impl NodeState {
51    const fn new(
52        peers_info: Option<Box<dyn PeersInfo>>,
53        latest_block: Option<BlockNumber>,
54    ) -> Self {
55        Self {
56            peers_info,
57            current_stage: None,
58            latest_block,
59            head_block_hash: None,
60            safe_block_hash: None,
61            finalized_block_hash: None,
62            last_status_log_time: None,
63        }
64    }
65
66    fn num_connected_peers(&self) -> usize {
67        self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
68    }
69
70    fn build_current_stage(
71        &self,
72        stage_id: StageId,
73        checkpoint: StageCheckpoint,
74        target: Option<BlockNumber>,
75    ) -> CurrentStage {
76        let (eta, entities_checkpoint) = self
77            .current_stage
78            .as_ref()
79            .filter(|current_stage| current_stage.stage_id == stage_id)
80            .map_or_else(
81                || (Eta::default(), None),
82                |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
83            );
84
85        CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
86    }
87
88    /// Processes an event emitted by the pipeline
89    fn handle_pipeline_event(&mut self, event: PipelineEvent) {
90        match event {
91            PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
92                let checkpoint = checkpoint.unwrap_or_default();
93                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
94
95                info!(
96                    pipeline_stages = %pipeline_stages_progress,
97                    stage = %stage_id,
98                    checkpoint = %checkpoint.block_number,
99                    target = %OptionalField(target),
100                    "Preparing stage",
101                );
102
103                self.current_stage = Some(current_stage);
104            }
105            PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
106                let checkpoint = checkpoint.unwrap_or_default();
107                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
108
109                if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
110                    info!(
111                        pipeline_stages = %pipeline_stages_progress,
112                        stage = %stage_id,
113                        checkpoint = %checkpoint.block_number,
114                        target = %OptionalField(target),
115                        %stage_eta,
116                        "Executing stage",
117                    );
118                } else {
119                    info!(
120                        pipeline_stages = %pipeline_stages_progress,
121                        stage = %stage_id,
122                        checkpoint = %checkpoint.block_number,
123                        target = %OptionalField(target),
124                        "Executing stage",
125                    );
126                }
127
128                self.current_stage = Some(current_stage);
129            }
130            PipelineEvent::Ran {
131                pipeline_stages_progress,
132                stage_id,
133                result: ExecOutput { checkpoint, done },
134            } => {
135                if stage_id.is_finish() {
136                    self.latest_block = Some(checkpoint.block_number);
137                }
138
139                if let Some(current_stage) = self.current_stage.as_mut() {
140                    current_stage.checkpoint = checkpoint;
141                    current_stage.entities_checkpoint = checkpoint.entities();
142                    current_stage.eta.update(stage_id, checkpoint);
143
144                    let target = OptionalField(current_stage.target);
145                    let stage_progress = current_stage
146                        .entities_checkpoint
147                        .and_then(|entities| entities.fmt_percentage());
148                    let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
149
150                    let message = if done { "Finished stage" } else { "Committed stage progress" };
151
152                    match (stage_progress, stage_eta) {
153                        (Some(stage_progress), Some(stage_eta)) => {
154                            info!(
155                                pipeline_stages = %pipeline_stages_progress,
156                                stage = %stage_id,
157                                checkpoint = %checkpoint.block_number,
158                                %target,
159                                %stage_progress,
160                                %stage_eta,
161                                "{message}",
162                            )
163                        }
164                        (Some(stage_progress), None) => {
165                            info!(
166                                pipeline_stages = %pipeline_stages_progress,
167                                stage = %stage_id,
168                                checkpoint = %checkpoint.block_number,
169                                %target,
170                                %stage_progress,
171                                "{message}",
172                            )
173                        }
174                        (None, Some(stage_eta)) => {
175                            info!(
176                                pipeline_stages = %pipeline_stages_progress,
177                                stage = %stage_id,
178                                checkpoint = %checkpoint.block_number,
179                                %target,
180                                %stage_eta,
181                                "{message}",
182                            )
183                        }
184                        (None, None) => {
185                            info!(
186                                pipeline_stages = %pipeline_stages_progress,
187                                stage = %stage_id,
188                                checkpoint = %checkpoint.block_number,
189                                %target,
190                                "{message}",
191                            )
192                        }
193                    }
194                }
195
196                if done {
197                    self.current_stage = None;
198                }
199            }
200            PipelineEvent::Unwind { stage_id, input } => {
201                let current_stage = CurrentStage {
202                    stage_id,
203                    eta: Eta::default(),
204                    checkpoint: input.checkpoint,
205                    target: Some(input.unwind_to),
206                    entities_checkpoint: input.checkpoint.entities(),
207                };
208
209                self.current_stage = Some(current_stage);
210            }
211            _ => (),
212        }
213    }
214
215    fn handle_consensus_engine_event<N: NodePrimitives>(&mut self, event: ConsensusEngineEvent<N>) {
216        match event {
217            ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
218                let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
219                    state;
220                if self.safe_block_hash != Some(safe_block_hash) &&
221                    self.finalized_block_hash != Some(finalized_block_hash)
222                {
223                    let msg = match status {
224                        ForkchoiceStatus::Valid => "Forkchoice updated",
225                        ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
226                        ForkchoiceStatus::Syncing => {
227                            "Received forkchoice updated message when syncing"
228                        }
229                    };
230                    info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
231                }
232                self.head_block_hash = Some(head_block_hash);
233                self.safe_block_hash = Some(safe_block_hash);
234                self.finalized_block_hash = Some(finalized_block_hash);
235            }
236            ConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
237                match live_sync_progress {
238                    ConsensusEngineLiveSyncProgress::DownloadingBlocks {
239                        remaining_blocks,
240                        target,
241                    } => {
242                        info!(
243                            remaining_blocks,
244                            target_block_hash=?target,
245                            "Live sync in progress, downloading blocks"
246                        );
247                    }
248                }
249            }
250            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
251                let block = executed.sealed_block();
252                let mut full = block.gas_used() as f64 * 100.0 / block.gas_limit() as f64;
253                if full.is_nan() {
254                    full = 0.0;
255                }
256                info!(
257                    number=block.number(),
258                    hash=?block.hash(),
259                    peers=self.num_connected_peers(),
260                    txs=block.body().transactions().len(),
261                    gas_used=%format_gas(block.gas_used()),
262                    gas_throughput=%format_gas_throughput(block.gas_used(), elapsed),
263                    gas_limit=%format_gas(block.gas_limit()),
264                    full=%format!("{:.1}%", full),
265                    base_fee=%format!("{:.2}Gwei", block.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
266                    blobs=block.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
267                    excess_blobs=block.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
268                    ?elapsed,
269                    "Block added to canonical chain"
270                );
271            }
272            ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
273                self.latest_block = Some(head.number());
274                info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
275            }
276            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
277                let block = executed.sealed_block();
278                info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
279            }
280            ConsensusEngineEvent::InvalidBlock(block) => {
281                warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
282            }
283            ConsensusEngineEvent::BlockReceived(num_hash) => {
284                info!(number=num_hash.number, hash=?num_hash.hash, "Received block from consensus engine");
285            }
286        }
287    }
288
289    fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
290        // If pipeline is running, it's fine to not receive any messages from the CL.
291        // So we need to report about CL health only when pipeline is idle.
292        if self.current_stage.is_none() {
293            match event {
294                ConsensusLayerHealthEvent::NeverSeen => {
295                    warn!(
296                        "Post-merge network, but never seen beacon client. Please launch one to follow the chain!"
297                    )
298                }
299                ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
300                    warn!(
301                        ?period,
302                        "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!"
303                    )
304                }
305                ConsensusLayerHealthEvent::NeverReceivedUpdates => {
306                    warn!(
307                        "Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!"
308                    )
309                }
310                ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
311                    warn!(
312                        ?period,
313                        "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!"
314                    )
315                }
316            }
317        }
318    }
319
320    fn handle_pruner_event(&self, event: PrunerEvent) {
321        match event {
322            PrunerEvent::Started { tip_block_number } => {
323                debug!(tip_block_number, "Pruner started");
324            }
325            PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
326                let stats = format!(
327                    "[{}]",
328                    stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
329                );
330                debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
331            }
332        }
333    }
334
335    fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
336        match event {
337            StaticFileProducerEvent::Started { targets } => {
338                debug!(?targets, "Static File Producer started");
339            }
340            StaticFileProducerEvent::Finished { targets, elapsed } => {
341                debug!(?targets, ?elapsed, "Static File Producer finished");
342            }
343        }
344    }
345}
346
347/// Helper type for formatting of optional fields:
348/// - If [Some(x)], then `x` is written
349/// - If [None], then `None` is written
350struct OptionalField<T: Display>(Option<T>);
351
352impl<T: Display> Display for OptionalField<T> {
353    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
354        if let Some(field) = &self.0 {
355            write!(f, "{field}")
356        } else {
357            write!(f, "None")
358        }
359    }
360}
361
362/// The stage currently being executed.
363struct CurrentStage {
364    stage_id: StageId,
365    eta: Eta,
366    checkpoint: StageCheckpoint,
367    /// The entities checkpoint for reporting the progress. If `None`, then the progress is not
368    /// available, probably because the stage didn't finish running and didn't update its
369    /// checkpoint yet.
370    entities_checkpoint: Option<EntitiesCheckpoint>,
371    target: Option<BlockNumber>,
372}
373
374/// A node event.
375#[derive(Debug, derive_more::From)]
376pub enum NodeEvent<N: NodePrimitives> {
377    /// A sync pipeline event.
378    Pipeline(PipelineEvent),
379    /// A consensus engine event.
380    ConsensusEngine(ConsensusEngineEvent<N>),
381    /// A Consensus Layer health event.
382    ConsensusLayerHealth(ConsensusLayerHealthEvent),
383    /// A pruner event
384    Pruner(PrunerEvent),
385    /// A `static_file_producer` event
386    StaticFileProducer(StaticFileProducerEvent),
387    /// Used to encapsulate various conditions or situations that do not
388    /// naturally fit into the other more specific variants.
389    Other(String),
390}
391
392/// Displays relevant information to the user from components of the node, and periodically
393/// displays the high-level status of the node.
394pub async fn handle_events<E, N: NodePrimitives>(
395    peers_info: Option<Box<dyn PeersInfo>>,
396    latest_block_number: Option<BlockNumber>,
397    events: E,
398) where
399    E: Stream<Item = NodeEvent<N>> + Unpin,
400{
401    let state = NodeState::new(peers_info, latest_block_number);
402
403    let start = tokio::time::Instant::now() + Duration::from_secs(3);
404    let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
405    info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
406
407    let handler = EventHandler { state, events, info_interval };
408    handler.await
409}
410
411/// Handles events emitted by the node and logs them accordingly.
412#[pin_project::pin_project]
413struct EventHandler<E> {
414    state: NodeState,
415    #[pin]
416    events: E,
417    #[pin]
418    info_interval: Interval,
419}
420
421impl<E, N: NodePrimitives> Future for EventHandler<E>
422where
423    E: Stream<Item = NodeEvent<N>> + Unpin,
424{
425    type Output = ();
426
427    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428        let mut this = self.project();
429
430        while this.info_interval.poll_tick(cx).is_ready() {
431            if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
432                &this.state.current_stage
433            {
434                let stage_progress =
435                    entities_checkpoint.and_then(|entities| entities.fmt_percentage());
436                let stage_eta = eta.fmt_for_stage(*stage_id);
437
438                match (stage_progress, stage_eta) {
439                    (Some(stage_progress), Some(stage_eta)) => {
440                        info!(
441                            target: "reth::cli",
442                            connected_peers = this.state.num_connected_peers(),
443                            stage = %stage_id,
444                            checkpoint = checkpoint.block_number,
445                            target = %OptionalField(*target),
446                            %stage_progress,
447                            %stage_eta,
448                            "Status"
449                        )
450                    }
451                    (Some(stage_progress), None) => {
452                        info!(
453                            target: "reth::cli",
454                            connected_peers = this.state.num_connected_peers(),
455                            stage = %stage_id,
456                            checkpoint = checkpoint.block_number,
457                            target = %OptionalField(*target),
458                            %stage_progress,
459                            "Status"
460                        )
461                    }
462                    (None, Some(stage_eta)) => {
463                        info!(
464                            target: "reth::cli",
465                            connected_peers = this.state.num_connected_peers(),
466                            stage = %stage_id,
467                            checkpoint = checkpoint.block_number,
468                            target = %OptionalField(*target),
469                            %stage_eta,
470                            "Status"
471                        )
472                    }
473                    (None, None) => {
474                        info!(
475                            target: "reth::cli",
476                            connected_peers = this.state.num_connected_peers(),
477                            stage = %stage_id,
478                            checkpoint = checkpoint.block_number,
479                            target = %OptionalField(*target),
480                            "Status"
481                        )
482                    }
483                }
484            } else {
485                let now =
486                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
487
488                // Only log status if we haven't logged recently
489                if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
490                    if let Some(latest_block) = this.state.latest_block {
491                        info!(
492                            target: "reth::cli",
493                            connected_peers = this.state.num_connected_peers(),
494                            %latest_block,
495                            "Status"
496                        );
497                    } else {
498                        info!(
499                            target: "reth::cli",
500                            connected_peers = this.state.num_connected_peers(),
501                            "Status"
502                        );
503                    }
504                    this.state.last_status_log_time = Some(now);
505                }
506            }
507        }
508
509        while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
510            match event {
511                NodeEvent::Pipeline(event) => {
512                    this.state.handle_pipeline_event(event);
513                }
514                NodeEvent::ConsensusEngine(event) => {
515                    this.state.handle_consensus_engine_event(event);
516                }
517                NodeEvent::ConsensusLayerHealth(event) => {
518                    this.state.handle_consensus_layer_health_event(event)
519                }
520                NodeEvent::Pruner(event) => {
521                    this.state.handle_pruner_event(event);
522                }
523                NodeEvent::StaticFileProducer(event) => {
524                    this.state.handle_static_file_producer_event(event);
525                }
526                NodeEvent::Other(event_description) => {
527                    warn!("{event_description}");
528                }
529            }
530        }
531
532        Poll::Pending
533    }
534}
535
536/// A container calculating the estimated time that a stage will complete in, based on stage
537/// checkpoints reported by the pipeline.
538///
539/// One `Eta` is only valid for a single stage.
540#[derive(Default, Copy, Clone)]
541struct Eta {
542    /// The last stage checkpoint
543    last_checkpoint: EntitiesCheckpoint,
544    /// The last time the stage reported its checkpoint
545    last_checkpoint_time: Option<Instant>,
546    /// The current ETA
547    eta: Option<Duration>,
548}
549
550impl Eta {
551    /// Update the ETA given the checkpoint, if possible.
552    fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
553        let Some(current) = checkpoint.entities() else { return };
554
555        if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
556            let Some(processed_since_last) =
557                current.processed.checked_sub(self.last_checkpoint.processed)
558            else {
559                self.eta = None;
560                debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
561                return
562            };
563            let elapsed = last_checkpoint_time.elapsed();
564            let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
565
566            let Some(remaining) = current.total.checked_sub(current.processed) else {
567                self.eta = None;
568                debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
569                return
570            };
571
572            self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
573        }
574
575        self.last_checkpoint = current;
576        self.last_checkpoint_time = Some(Instant::now());
577    }
578
579    /// Returns `true` if the ETA is available, i.e. at least one checkpoint has been reported.
580    fn is_available(&self) -> bool {
581        self.eta.zip(self.last_checkpoint_time).is_some()
582    }
583
584    /// Format ETA for a given stage.
585    ///
586    /// NOTE: Currently ETA is enabled only for the stages that have predictable progress.
587    /// It's not the case for network-dependent ([`StageId::Headers`] and [`StageId::Bodies`]) and
588    /// [`StageId::Execution`] stages.
589    fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
590        if !self.is_available() ||
591            matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
592        {
593            None
594        } else {
595            Some(self.to_string())
596        }
597    }
598}
599
600impl Display for Eta {
601    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
602        if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
603            let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
604
605            if let Some(remaining) = remaining {
606                return write!(
607                    f,
608                    "{}",
609                    humantime::format_duration(Duration::from_secs(remaining.as_secs()))
610                )
611            }
612        }
613
614        write!(f, "unknown")
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    #[test]
623    fn eta_display_no_milliseconds() {
624        let eta = Eta {
625            last_checkpoint_time: Some(Instant::now()),
626            eta: Some(Duration::from_millis(
627                13 * 60 * 1000 + // Minutes
628                    37 * 1000 + // Seconds
629                    999, // Milliseconds
630            )),
631            ..Default::default()
632        }
633        .to_string();
634
635        assert_eq!(eta, "13m 37s");
636    }
637}