Skip to main content

reth_node_events/
node.rs

1//! Support for handling events emitted by node components.
2
3use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader};
5use alloy_primitives::{BlockNumber, B256};
6use alloy_rpc_types_engine::ForkchoiceState;
7use futures::Stream;
8use reth_engine_primitives::{ConsensusEngineEvent, ForkchoiceStatus};
9use reth_network_api::PeersInfo;
10use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
11use reth_prune_types::PrunerEvent;
12use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
13use reth_static_file_types::StaticFileProducerEvent;
14use std::{
15    fmt::{Display, Formatter},
16    future::Future,
17    pin::Pin,
18    task::{Context, Poll},
19    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
20};
21use tokio::time::Interval;
22use tracing::{debug, info, warn};
23
24/// Interval of reporting node state.
25const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
26
27/// The current high-level state of the node, including the node's database environment, network
28/// connections, current processing stage, and the latest block information. It provides
29/// methods to handle different types of events that affect the node's state, such as pipeline
30/// events, network events, and consensus engine events.
31struct NodeState {
32    /// Information about connected peers.
33    peers_info: Option<Box<dyn PeersInfo>>,
34    /// The stage currently being executed.
35    current_stage: Option<CurrentStage>,
36    /// The latest block reached by either pipeline or consensus engine.
37    latest_block: Option<BlockNumber>,
38    /// Hash of the head block last set by fork choice update
39    head_block_hash: Option<B256>,
40    /// Hash of the safe block last set by fork choice update
41    safe_block_hash: Option<B256>,
42    /// Hash of finalized block last set by fork choice update
43    finalized_block_hash: Option<B256>,
44    /// The time when we last logged a status message
45    last_status_log_time: Option<u64>,
46}
47
48impl NodeState {
49    const fn new(
50        peers_info: Option<Box<dyn PeersInfo>>,
51        latest_block: Option<BlockNumber>,
52    ) -> Self {
53        Self {
54            peers_info,
55            current_stage: None,
56            latest_block,
57            head_block_hash: None,
58            safe_block_hash: None,
59            finalized_block_hash: None,
60            last_status_log_time: None,
61        }
62    }
63
64    fn num_connected_peers(&self) -> usize {
65        self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
66    }
67
68    fn build_current_stage(
69        &self,
70        stage_id: StageId,
71        checkpoint: StageCheckpoint,
72        target: Option<BlockNumber>,
73    ) -> CurrentStage {
74        let (eta, entities_checkpoint) = self
75            .current_stage
76            .as_ref()
77            .filter(|current_stage| current_stage.stage_id == stage_id)
78            .map_or_else(
79                || (Eta::default(), None),
80                |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
81            );
82
83        CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
84    }
85
86    /// Processes an event emitted by the pipeline
87    fn handle_pipeline_event(&mut self, event: PipelineEvent) {
88        match event {
89            PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
90                let checkpoint = checkpoint.unwrap_or_default();
91                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
92
93                info!(
94                    pipeline_stages = %pipeline_stages_progress,
95                    stage = %stage_id,
96                    checkpoint = %checkpoint.block_number,
97                    target = %OptionalField(target),
98                    "Preparing stage",
99                );
100
101                self.current_stage = Some(current_stage);
102            }
103            PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
104                let checkpoint = checkpoint.unwrap_or_default();
105                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
106
107                if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
108                    info!(
109                        pipeline_stages = %pipeline_stages_progress,
110                        stage = %stage_id,
111                        checkpoint = %checkpoint.block_number,
112                        target = %OptionalField(target),
113                        %stage_eta,
114                        "Executing stage",
115                    );
116                } else {
117                    info!(
118                        pipeline_stages = %pipeline_stages_progress,
119                        stage = %stage_id,
120                        checkpoint = %checkpoint.block_number,
121                        target = %OptionalField(target),
122                        "Executing stage",
123                    );
124                }
125
126                self.current_stage = Some(current_stage);
127            }
128            PipelineEvent::Ran {
129                pipeline_stages_progress,
130                stage_id,
131                result: ExecOutput { checkpoint, done },
132            } => {
133                if stage_id.is_finish() {
134                    self.latest_block = Some(checkpoint.block_number);
135                }
136
137                if let Some(current_stage) = self.current_stage.as_mut() {
138                    current_stage.checkpoint = checkpoint;
139                    current_stage.entities_checkpoint = checkpoint.entities();
140                    current_stage.eta.update(stage_id, checkpoint);
141
142                    let target = OptionalField(current_stage.target);
143                    let stage_progress = current_stage
144                        .entities_checkpoint
145                        .and_then(|entities| entities.fmt_percentage());
146                    let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
147
148                    let message = if done { "Finished stage" } else { "Committed stage progress" };
149
150                    match (stage_progress, stage_eta) {
151                        (Some(stage_progress), Some(stage_eta)) => {
152                            info!(
153                                pipeline_stages = %pipeline_stages_progress,
154                                stage = %stage_id,
155                                checkpoint = %checkpoint.block_number,
156                                %target,
157                                %stage_progress,
158                                %stage_eta,
159                                "{message}",
160                            )
161                        }
162                        (Some(stage_progress), None) => {
163                            info!(
164                                pipeline_stages = %pipeline_stages_progress,
165                                stage = %stage_id,
166                                checkpoint = %checkpoint.block_number,
167                                %target,
168                                %stage_progress,
169                                "{message}",
170                            )
171                        }
172                        (None, Some(stage_eta)) => {
173                            info!(
174                                pipeline_stages = %pipeline_stages_progress,
175                                stage = %stage_id,
176                                checkpoint = %checkpoint.block_number,
177                                %target,
178                                %stage_eta,
179                                "{message}",
180                            )
181                        }
182                        (None, None) => {
183                            info!(
184                                pipeline_stages = %pipeline_stages_progress,
185                                stage = %stage_id,
186                                checkpoint = %checkpoint.block_number,
187                                %target,
188                                "{message}",
189                            )
190                        }
191                    }
192                }
193
194                if done {
195                    self.current_stage = None;
196                }
197            }
198            PipelineEvent::Unwind { stage_id, input } => {
199                let current_stage = CurrentStage {
200                    stage_id,
201                    eta: Eta::default(),
202                    checkpoint: input.checkpoint,
203                    target: Some(input.unwind_to),
204                    entities_checkpoint: input.checkpoint.entities(),
205                };
206
207                self.current_stage = Some(current_stage);
208            }
209            PipelineEvent::Unwound { stage_id, result } => {
210                info!(stage = %stage_id, checkpoint = %result.checkpoint.block_number, "Unwound stage");
211                self.current_stage = None;
212            }
213            _ => (),
214        }
215    }
216
217    fn handle_consensus_engine_event<N: NodePrimitives>(&mut self, event: ConsensusEngineEvent<N>) {
218        match event {
219            ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
220                let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
221                    state;
222                if self.safe_block_hash != Some(safe_block_hash) &&
223                    self.finalized_block_hash != Some(finalized_block_hash)
224                {
225                    let msg = match status {
226                        ForkchoiceStatus::Valid => "Forkchoice updated",
227                        ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
228                        ForkchoiceStatus::Syncing => {
229                            "Received forkchoice updated message when syncing"
230                        }
231                    };
232                    info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
233                }
234                self.head_block_hash = Some(head_block_hash);
235                self.safe_block_hash = Some(safe_block_hash);
236                self.finalized_block_hash = Some(finalized_block_hash);
237            }
238            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
239                let block = executed.sealed_block();
240                let mut full = block.gas_used() as f64 * 100.0 / block.gas_limit() as f64;
241                if full.is_nan() {
242                    full = 0.0;
243                }
244                info!(
245                    number=block.number(),
246                    hash=?block.hash(),
247                    peers=self.num_connected_peers(),
248                    txs=block.body().transactions().len(),
249                    gas_used=%format_gas(block.gas_used()),
250                    gas_throughput=%format_gas_throughput(block.gas_used(), elapsed),
251                    gas_limit=%format_gas(block.gas_limit()),
252                    full=%format!("{:.1}%", full),
253                    base_fee=%format!("{:.2}Gwei", block.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
254                    blobs=block.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
255                    excess_blobs=block.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
256                    ?elapsed,
257                    "Block added to canonical chain"
258                );
259            }
260            ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
261                self.latest_block = Some(head.number());
262                info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
263            }
264            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
265                let block = executed.sealed_block();
266                info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
267            }
268            ConsensusEngineEvent::InvalidBlock(block) => {
269                warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
270            }
271            ConsensusEngineEvent::BlockReceived(num_hash) => {
272                info!(number=num_hash.number, hash=?num_hash.hash, "Received new payload from consensus engine");
273            }
274        }
275    }
276
277    fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
278        // If pipeline is running, it's fine to not receive any messages from the CL.
279        // So we need to report about CL health only when pipeline is idle.
280        if self.current_stage.is_none() {
281            match event {
282                ConsensusLayerHealthEvent::NeverSeen => {
283                    warn!(
284                        "Post-merge network, but never seen beacon client. Please launch one to follow the chain!"
285                    )
286                }
287                ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
288                    warn!(
289                        ?period,
290                        "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!"
291                    )
292                }
293            }
294        }
295    }
296
297    fn handle_pruner_event(&self, event: PrunerEvent) {
298        match event {
299            PrunerEvent::Started { tip_block_number } => {
300                debug!(tip_block_number, "Pruner started");
301            }
302            PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
303                let stats = format!(
304                    "[{}]",
305                    stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
306                );
307                debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
308            }
309        }
310    }
311
312    fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
313        match event {
314            StaticFileProducerEvent::Started { targets } => {
315                debug!(?targets, "Static File Producer started");
316            }
317            StaticFileProducerEvent::Finished { targets, elapsed } => {
318                debug!(?targets, ?elapsed, "Static File Producer finished");
319            }
320        }
321    }
322}
323
324/// Helper type for formatting of optional fields:
325/// - If [Some(x)], then `x` is written
326/// - If [None], then `None` is written
327struct OptionalField<T: Display>(Option<T>);
328
329impl<T: Display> Display for OptionalField<T> {
330    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
331        if let Some(field) = &self.0 {
332            write!(f, "{field}")
333        } else {
334            write!(f, "None")
335        }
336    }
337}
338
339/// The stage currently being executed.
340struct CurrentStage {
341    stage_id: StageId,
342    eta: Eta,
343    checkpoint: StageCheckpoint,
344    /// The entities checkpoint for reporting the progress. If `None`, then the progress is not
345    /// available, probably because the stage didn't finish running and didn't update its
346    /// checkpoint yet.
347    entities_checkpoint: Option<EntitiesCheckpoint>,
348    target: Option<BlockNumber>,
349}
350
351/// A node event.
352#[derive(Debug, derive_more::From)]
353pub enum NodeEvent<N: NodePrimitives> {
354    /// A sync pipeline event.
355    Pipeline(PipelineEvent),
356    /// A consensus engine event.
357    ConsensusEngine(ConsensusEngineEvent<N>),
358    /// A Consensus Layer health event.
359    ConsensusLayerHealth(ConsensusLayerHealthEvent),
360    /// A pruner event
361    Pruner(PrunerEvent),
362    /// A `static_file_producer` event
363    StaticFileProducer(StaticFileProducerEvent),
364    /// Used to encapsulate various conditions or situations that do not
365    /// naturally fit into the other more specific variants.
366    Other(String),
367}
368
369/// Displays relevant information to the user from components of the node, and periodically
370/// displays the high-level status of the node.
371pub async fn handle_events<E, N: NodePrimitives>(
372    peers_info: Option<Box<dyn PeersInfo>>,
373    latest_block_number: Option<BlockNumber>,
374    events: E,
375) where
376    E: Stream<Item = NodeEvent<N>> + Unpin,
377{
378    let state = NodeState::new(peers_info, latest_block_number);
379
380    let start = tokio::time::Instant::now() + Duration::from_secs(3);
381    let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
382    info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
383
384    let handler = EventHandler { state, events, info_interval };
385    handler.await
386}
387
388/// Handles events emitted by the node and logs them accordingly.
389#[pin_project::pin_project]
390struct EventHandler<E> {
391    state: NodeState,
392    #[pin]
393    events: E,
394    #[pin]
395    info_interval: Interval,
396}
397
398impl<E, N: NodePrimitives> Future for EventHandler<E>
399where
400    E: Stream<Item = NodeEvent<N>> + Unpin,
401{
402    type Output = ();
403
404    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
405        let mut this = self.project();
406
407        while this.info_interval.poll_tick(cx).is_ready() {
408            if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
409                &this.state.current_stage
410            {
411                let stage_progress =
412                    entities_checkpoint.and_then(|entities| entities.fmt_percentage());
413                let stage_eta = eta.fmt_for_stage(*stage_id);
414
415                match (stage_progress, stage_eta) {
416                    (Some(stage_progress), Some(stage_eta)) => {
417                        info!(
418                            target: "reth::cli",
419                            connected_peers = this.state.num_connected_peers(),
420                            stage = %stage_id,
421                            checkpoint = checkpoint.block_number,
422                            target = %OptionalField(*target),
423                            %stage_progress,
424                            %stage_eta,
425                            "Status"
426                        )
427                    }
428                    (Some(stage_progress), None) => {
429                        info!(
430                            target: "reth::cli",
431                            connected_peers = this.state.num_connected_peers(),
432                            stage = %stage_id,
433                            checkpoint = checkpoint.block_number,
434                            target = %OptionalField(*target),
435                            %stage_progress,
436                            "Status"
437                        )
438                    }
439                    (None, Some(stage_eta)) => {
440                        info!(
441                            target: "reth::cli",
442                            connected_peers = this.state.num_connected_peers(),
443                            stage = %stage_id,
444                            checkpoint = checkpoint.block_number,
445                            target = %OptionalField(*target),
446                            %stage_eta,
447                            "Status"
448                        )
449                    }
450                    (None, None) => {
451                        info!(
452                            target: "reth::cli",
453                            connected_peers = this.state.num_connected_peers(),
454                            stage = %stage_id,
455                            checkpoint = checkpoint.block_number,
456                            target = %OptionalField(*target),
457                            "Status"
458                        )
459                    }
460                }
461            } else {
462                let now =
463                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
464
465                // Only log status if we haven't logged recently
466                if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
467                    if let Some(latest_block) = this.state.latest_block {
468                        info!(
469                            target: "reth::cli",
470                            connected_peers = this.state.num_connected_peers(),
471                            %latest_block,
472                            "Status"
473                        );
474                    } else {
475                        info!(
476                            target: "reth::cli",
477                            connected_peers = this.state.num_connected_peers(),
478                            "Status"
479                        );
480                    }
481                    this.state.last_status_log_time = Some(now);
482                }
483            }
484        }
485
486        while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
487            match event {
488                NodeEvent::Pipeline(event) => {
489                    this.state.handle_pipeline_event(event);
490                }
491                NodeEvent::ConsensusEngine(event) => {
492                    this.state.handle_consensus_engine_event(event);
493                }
494                NodeEvent::ConsensusLayerHealth(event) => {
495                    this.state.handle_consensus_layer_health_event(event)
496                }
497                NodeEvent::Pruner(event) => {
498                    this.state.handle_pruner_event(event);
499                }
500                NodeEvent::StaticFileProducer(event) => {
501                    this.state.handle_static_file_producer_event(event);
502                }
503                NodeEvent::Other(event_description) => {
504                    warn!("{event_description}");
505                }
506            }
507        }
508
509        Poll::Pending
510    }
511}
512
513/// A container calculating the estimated time that a stage will complete in, based on stage
514/// checkpoints reported by the pipeline.
515///
516/// One `Eta` is only valid for a single stage.
517#[derive(Default, Copy, Clone)]
518struct Eta {
519    /// The last stage checkpoint
520    last_checkpoint: EntitiesCheckpoint,
521    /// The last time the stage reported its checkpoint
522    last_checkpoint_time: Option<Instant>,
523    /// The current ETA
524    eta: Option<Duration>,
525}
526
527impl Eta {
528    /// Update the ETA given the checkpoint, if possible.
529    fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
530        let Some(current) = checkpoint.entities() else { return };
531
532        if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
533            let Some(processed_since_last) =
534                current.processed.checked_sub(self.last_checkpoint.processed)
535            else {
536                self.eta = None;
537                debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
538                return
539            };
540            let elapsed = last_checkpoint_time.elapsed();
541            let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
542
543            let Some(remaining) = current.total.checked_sub(current.processed) else {
544                self.eta = None;
545                debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
546                return
547            };
548
549            self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
550        }
551
552        self.last_checkpoint = current;
553        self.last_checkpoint_time = Some(Instant::now());
554    }
555
556    /// Returns `true` if the ETA is available, i.e. at least one checkpoint has been reported.
557    fn is_available(&self) -> bool {
558        self.eta.zip(self.last_checkpoint_time).is_some()
559    }
560
561    /// Format ETA for a given stage.
562    ///
563    /// NOTE: Currently ETA is enabled only for the stages that have predictable progress.
564    /// It's not the case for network-dependent ([`StageId::Headers`] and [`StageId::Bodies`]) and
565    /// [`StageId::Execution`] stages.
566    fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
567        if !self.is_available() ||
568            matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
569        {
570            None
571        } else {
572            Some(self.to_string())
573        }
574    }
575}
576
577impl Display for Eta {
578    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579        if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
580            let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
581
582            if let Some(remaining) = remaining {
583                return write!(
584                    f,
585                    "{}",
586                    humantime::format_duration(Duration::from_secs(remaining.as_secs()))
587                        .to_string()
588                        .replace(' ', "")
589                )
590            }
591        }
592
593        write!(f, "unknown")
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600
601    #[test]
602    fn eta_display_no_milliseconds() {
603        let eta = Eta {
604            last_checkpoint_time: Some(Instant::now()),
605            eta: Some(Duration::from_millis(
606                13 * 60 * 1000 + // Minutes
607                    37 * 1000 + // Seconds
608                    999, // Milliseconds
609            )),
610            ..Default::default()
611        }
612        .to_string();
613
614        assert_eq!(eta, "13m37s");
615    }
616}