Skip to main content

reth_node_events/
node.rs

1//! Support for handling events emitted by node components.
2
3use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::{
5    constants::{GWEI_TO_WEI, MGAS_TO_GAS},
6    BlockHeader,
7};
8use alloy_primitives::{BlockNumber, B256};
9use alloy_rpc_types_engine::ForkchoiceState;
10use futures::Stream;
11use reth_engine_primitives::{ConsensusEngineEvent, ForkchoiceStatus, SlowBlockInfo};
12use reth_network_api::PeersInfo;
13use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
14use reth_prune_types::PrunerEvent;
15use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
16use reth_static_file_types::StaticFileProducerEvent;
17use std::{
18    fmt::{Display, Formatter},
19    future::Future,
20    pin::Pin,
21    task::{Context, Poll},
22    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
23};
24use tokio::time::Interval;
25use tracing::{debug, info, warn};
26
27/// Interval of reporting node state.
28const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
29
30/// The current high-level state of the node, including the node's database environment, network
31/// connections, current processing stage, and the latest block information. It provides
32/// methods to handle different types of events that affect the node's state, such as pipeline
33/// events, network events, and consensus engine events.
34struct NodeState {
35    /// Information about connected peers.
36    peers_info: Option<Box<dyn PeersInfo>>,
37    /// The stage currently being executed.
38    current_stage: Option<CurrentStage>,
39    /// The latest block reached by either pipeline or consensus engine.
40    latest_block: Option<BlockNumber>,
41    /// Hash of the head block last set by fork choice update
42    head_block_hash: Option<B256>,
43    /// Hash of the safe block last set by fork choice update
44    safe_block_hash: Option<B256>,
45    /// Hash of finalized block last set by fork choice update
46    finalized_block_hash: Option<B256>,
47    /// The time when we last logged a status message
48    last_status_log_time: Option<u64>,
49}
50
51impl NodeState {
52    const fn new(
53        peers_info: Option<Box<dyn PeersInfo>>,
54        latest_block: Option<BlockNumber>,
55    ) -> Self {
56        Self {
57            peers_info,
58            current_stage: None,
59            latest_block,
60            head_block_hash: None,
61            safe_block_hash: None,
62            finalized_block_hash: None,
63            last_status_log_time: None,
64        }
65    }
66
67    fn num_connected_peers(&self) -> usize {
68        self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
69    }
70
71    fn build_current_stage(
72        &self,
73        stage_id: StageId,
74        checkpoint: StageCheckpoint,
75        target: Option<BlockNumber>,
76    ) -> CurrentStage {
77        let (eta, entities_checkpoint) = self
78            .current_stage
79            .as_ref()
80            .filter(|current_stage| current_stage.stage_id == stage_id)
81            .map_or_else(
82                || (Eta::default(), None),
83                |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
84            );
85
86        CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
87    }
88
89    /// Processes an event emitted by the pipeline
90    fn handle_pipeline_event(&mut self, event: PipelineEvent) {
91        match event {
92            PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
93                let checkpoint = checkpoint.unwrap_or_default();
94                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
95
96                info!(
97                    pipeline_stages = %pipeline_stages_progress,
98                    stage = %stage_id,
99                    checkpoint = %checkpoint.block_number,
100                    target = %OptionalField(target),
101                    "Preparing stage",
102                );
103
104                self.current_stage = Some(current_stage);
105            }
106            PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
107                let checkpoint = checkpoint.unwrap_or_default();
108                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
109
110                if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
111                    info!(
112                        pipeline_stages = %pipeline_stages_progress,
113                        stage = %stage_id,
114                        checkpoint = %checkpoint.block_number,
115                        target = %OptionalField(target),
116                        %stage_eta,
117                        "Executing stage",
118                    );
119                } else {
120                    info!(
121                        pipeline_stages = %pipeline_stages_progress,
122                        stage = %stage_id,
123                        checkpoint = %checkpoint.block_number,
124                        target = %OptionalField(target),
125                        "Executing stage",
126                    );
127                }
128
129                self.current_stage = Some(current_stage);
130            }
131            PipelineEvent::Ran {
132                pipeline_stages_progress,
133                stage_id,
134                result: ExecOutput { checkpoint, done },
135            } => {
136                if stage_id.is_finish() {
137                    self.latest_block = Some(checkpoint.block_number);
138                }
139
140                if let Some(current_stage) = self.current_stage.as_mut() {
141                    current_stage.checkpoint = checkpoint;
142                    current_stage.entities_checkpoint = checkpoint.entities();
143                    current_stage.eta.update(stage_id, checkpoint);
144
145                    let target = OptionalField(current_stage.target);
146                    let stage_progress = current_stage
147                        .entities_checkpoint
148                        .and_then(|entities| entities.fmt_percentage());
149                    let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
150
151                    let message = if done { "Finished stage" } else { "Committed stage progress" };
152
153                    match (stage_progress, stage_eta) {
154                        (Some(stage_progress), Some(stage_eta)) => {
155                            info!(
156                                pipeline_stages = %pipeline_stages_progress,
157                                stage = %stage_id,
158                                checkpoint = %checkpoint.block_number,
159                                %target,
160                                %stage_progress,
161                                %stage_eta,
162                                "{message}",
163                            )
164                        }
165                        (Some(stage_progress), None) => {
166                            info!(
167                                pipeline_stages = %pipeline_stages_progress,
168                                stage = %stage_id,
169                                checkpoint = %checkpoint.block_number,
170                                %target,
171                                %stage_progress,
172                                "{message}",
173                            )
174                        }
175                        (None, Some(stage_eta)) => {
176                            info!(
177                                pipeline_stages = %pipeline_stages_progress,
178                                stage = %stage_id,
179                                checkpoint = %checkpoint.block_number,
180                                %target,
181                                %stage_eta,
182                                "{message}",
183                            )
184                        }
185                        (None, None) => {
186                            info!(
187                                pipeline_stages = %pipeline_stages_progress,
188                                stage = %stage_id,
189                                checkpoint = %checkpoint.block_number,
190                                %target,
191                                "{message}",
192                            )
193                        }
194                    }
195                }
196
197                if done {
198                    self.current_stage = None;
199                }
200            }
201            PipelineEvent::Unwind { stage_id, input } => {
202                let current_stage = CurrentStage {
203                    stage_id,
204                    eta: Eta::default(),
205                    checkpoint: input.checkpoint,
206                    target: Some(input.unwind_to),
207                    entities_checkpoint: input.checkpoint.entities(),
208                };
209
210                self.current_stage = Some(current_stage);
211            }
212            PipelineEvent::Unwound { stage_id, result } => {
213                info!(stage = %stage_id, checkpoint = %result.checkpoint.block_number, "Unwound stage");
214                self.current_stage = None;
215            }
216            _ => (),
217        }
218    }
219
220    fn handle_consensus_engine_event<N: NodePrimitives>(&mut self, event: ConsensusEngineEvent<N>) {
221        match event {
222            ConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
223                let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
224                    state;
225                if self.safe_block_hash != Some(safe_block_hash) &&
226                    self.finalized_block_hash != Some(finalized_block_hash)
227                {
228                    let msg = match status {
229                        ForkchoiceStatus::Valid => "Forkchoice updated",
230                        ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
231                        ForkchoiceStatus::Syncing => {
232                            "Received forkchoice updated message when syncing"
233                        }
234                    };
235                    info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
236                }
237                self.head_block_hash = Some(head_block_hash);
238                self.safe_block_hash = Some(safe_block_hash);
239                self.finalized_block_hash = Some(finalized_block_hash);
240            }
241            ConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
242                let block = executed.sealed_block();
243                let mut full = block.gas_used() as f64 * 100.0 / block.gas_limit() as f64;
244                if full.is_nan() {
245                    full = 0.0;
246                }
247                info!(
248                    number=block.number(),
249                    hash=?block.hash(),
250                    peers=self.num_connected_peers(),
251                    txs=block.body().transactions().len(),
252                    gas_used=%format_gas(block.gas_used()),
253                    gas_throughput=%format_gas_throughput(block.gas_used(), elapsed),
254                    gas_limit=%format_gas(block.gas_limit()),
255                    full=%format!("{:.1}%", full),
256                    base_fee=%format!("{:.2}Gwei", block.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
257                    blobs=block.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
258                    excess_blobs=block.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
259                    ?elapsed,
260                    "Block added to canonical chain"
261                );
262            }
263            ConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
264                self.latest_block = Some(head.number());
265                info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
266            }
267            ConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
268                let block = executed.sealed_block();
269                info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
270            }
271            ConsensusEngineEvent::InvalidBlock(block) => {
272                warn!(number=block.number(), hash=?block.hash(), "Encountered invalid block");
273            }
274            ConsensusEngineEvent::BlockReceived(num_hash) => {
275                info!(number=num_hash.number, hash=?num_hash.hash, "Received new payload from consensus engine");
276            }
277            ConsensusEngineEvent::SlowBlock(info) => {
278                Self::log_slow_block(&info);
279            }
280        }
281    }
282
283    fn log_slow_block(info: &SlowBlockInfo) {
284        fn hit_rate(hits: usize, misses: usize) -> f64 {
285            let total = hits + misses;
286            if total > 0 {
287                (hits as f64 / total as f64) * 100.0
288            } else {
289                0.0
290            }
291        }
292
293        let stats = &info.stats;
294        let processing_secs =
295            stats.execution_duration.as_secs_f64() + stats.state_hash_duration.as_secs_f64();
296        let mgas_per_sec = if processing_secs > 0.0 {
297            (stats.gas_used as f64 / MGAS_TO_GAS as f64) / processing_secs
298        } else {
299            0.0
300        };
301
302        // Macro for the shared fields — commit_ms is only included when known
303        // (after persistence), omitted entirely for the immediate post-execution emit.
304        macro_rules! log_slow_block_fields {
305            ($($commit_field:tt)*) => {
306                warn!(
307                    target: "reth::slow_block",
308                    message = "Slow block",
309                    block.number = stats.block_number,
310                    block.hash = ?stats.block_hash,
311                    block.gas_used = stats.gas_used,
312                    block.tx_count = stats.tx_count,
313                    timing.execution_ms = stats.execution_duration.as_millis(),
314                    timing.state_read_ms = stats.state_read_duration.as_millis(),
315                    timing.state_hash_ms = stats.state_hash_duration.as_millis(),
316                    $($commit_field)*
317                    timing.total_ms = info.total_duration.as_millis(),
318                    throughput.mgas_per_sec = format!("{:.2}", mgas_per_sec),
319                    state_reads.accounts = stats.accounts_read,
320                    state_reads.storage_slots = stats.storage_read,
321                    state_reads.code = stats.code_read,
322                    state_reads.code_bytes = stats.code_bytes_read,
323                    state_writes.accounts = stats.accounts_changed,
324                    state_writes.accounts_deleted = stats.accounts_deleted,
325                    state_writes.storage_slots = stats.storage_slots_changed,
326                    state_writes.storage_slots_deleted = stats.storage_slots_deleted,
327                    state_writes.code = stats.bytecodes_changed,
328                    state_writes.code_bytes = stats.code_bytes_written,
329                    state_writes.eip7702_delegations_set = stats.eip7702_delegations_set,
330                    state_writes.eip7702_delegations_cleared = stats.eip7702_delegations_cleared,
331                    cache.account.hits = stats.account_cache_hits,
332                    cache.account.misses = stats.account_cache_misses,
333                    cache.account.hit_rate = format!("{:.2}", hit_rate(stats.account_cache_hits, stats.account_cache_misses)),
334                    cache.storage.hits = stats.storage_cache_hits,
335                    cache.storage.misses = stats.storage_cache_misses,
336                    cache.storage.hit_rate = format!("{:.2}", hit_rate(stats.storage_cache_hits, stats.storage_cache_misses)),
337                    cache.code.hits = stats.code_cache_hits,
338                    cache.code.misses = stats.code_cache_misses,
339                    cache.code.hit_rate = format!("{:.2}", hit_rate(stats.code_cache_hits, stats.code_cache_misses)),
340                );
341            }
342        }
343
344        if let Some(commit_dur) = info.commit_duration {
345            log_slow_block_fields!(timing.commit_ms = commit_dur.as_millis(),);
346        } else {
347            log_slow_block_fields!();
348        }
349    }
350
351    fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
352        // If pipeline is running, it's fine to not receive any messages from the CL.
353        // So we need to report about CL health only when pipeline is idle.
354        if self.current_stage.is_none() {
355            match event {
356                ConsensusLayerHealthEvent::NeverSeen => {
357                    warn!(
358                        "Post-merge network, but never seen beacon client. Please launch one to follow the chain!"
359                    )
360                }
361                ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
362                    warn!(
363                        ?period,
364                        "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!"
365                    )
366                }
367            }
368        }
369    }
370
371    fn handle_pruner_event(&self, event: PrunerEvent) {
372        match event {
373            PrunerEvent::Started { tip_block_number } => {
374                debug!(tip_block_number, "Pruner started");
375            }
376            PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
377                let stats = format!(
378                    "[{}]",
379                    stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
380                );
381                debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
382            }
383        }
384    }
385
386    fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
387        match event {
388            StaticFileProducerEvent::Started { targets } => {
389                debug!(?targets, "Static File Producer started");
390            }
391            StaticFileProducerEvent::Finished { targets, elapsed } => {
392                debug!(?targets, ?elapsed, "Static File Producer finished");
393            }
394        }
395    }
396}
397
398/// Helper type for formatting of optional fields:
399/// - If [Some(x)], then `x` is written
400/// - If [None], then `None` is written
401struct OptionalField<T: Display>(Option<T>);
402
403impl<T: Display> Display for OptionalField<T> {
404    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
405        if let Some(field) = &self.0 {
406            write!(f, "{field}")
407        } else {
408            write!(f, "None")
409        }
410    }
411}
412
413/// The stage currently being executed.
414struct CurrentStage {
415    stage_id: StageId,
416    eta: Eta,
417    checkpoint: StageCheckpoint,
418    /// The entities checkpoint for reporting the progress. If `None`, then the progress is not
419    /// available, probably because the stage didn't finish running and didn't update its
420    /// checkpoint yet.
421    entities_checkpoint: Option<EntitiesCheckpoint>,
422    target: Option<BlockNumber>,
423}
424
425/// A node event.
426#[derive(Debug, derive_more::From)]
427pub enum NodeEvent<N: NodePrimitives> {
428    /// A sync pipeline event.
429    Pipeline(PipelineEvent),
430    /// A consensus engine event.
431    ConsensusEngine(ConsensusEngineEvent<N>),
432    /// A Consensus Layer health event.
433    ConsensusLayerHealth(ConsensusLayerHealthEvent),
434    /// A pruner event
435    Pruner(PrunerEvent),
436    /// A `static_file_producer` event
437    StaticFileProducer(StaticFileProducerEvent),
438    /// Used to encapsulate various conditions or situations that do not
439    /// naturally fit into the other more specific variants.
440    Other(String),
441}
442
443/// Displays relevant information to the user from components of the node, and periodically
444/// displays the high-level status of the node.
445pub async fn handle_events<E, N: NodePrimitives>(
446    peers_info: Option<Box<dyn PeersInfo>>,
447    latest_block_number: Option<BlockNumber>,
448    events: E,
449) where
450    E: Stream<Item = NodeEvent<N>> + Unpin,
451{
452    let state = NodeState::new(peers_info, latest_block_number);
453
454    let start = tokio::time::Instant::now() + Duration::from_secs(3);
455    let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
456    info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
457
458    let handler = EventHandler { state, events, info_interval };
459    handler.await
460}
461
462/// Handles events emitted by the node and logs them accordingly.
463#[pin_project::pin_project]
464struct EventHandler<E> {
465    state: NodeState,
466    #[pin]
467    events: E,
468    #[pin]
469    info_interval: Interval,
470}
471
472impl<E, N: NodePrimitives> Future for EventHandler<E>
473where
474    E: Stream<Item = NodeEvent<N>> + Unpin,
475{
476    type Output = ();
477
478    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
479        let mut this = self.project();
480
481        while this.info_interval.poll_tick(cx).is_ready() {
482            if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
483                &this.state.current_stage
484            {
485                let stage_progress =
486                    entities_checkpoint.and_then(|entities| entities.fmt_percentage());
487                let stage_eta = eta.fmt_for_stage(*stage_id);
488
489                match (stage_progress, stage_eta) {
490                    (Some(stage_progress), Some(stage_eta)) => {
491                        info!(
492                            target: "reth::cli",
493                            connected_peers = this.state.num_connected_peers(),
494                            stage = %stage_id,
495                            checkpoint = checkpoint.block_number,
496                            target = %OptionalField(*target),
497                            %stage_progress,
498                            %stage_eta,
499                            "Status"
500                        )
501                    }
502                    (Some(stage_progress), None) => {
503                        info!(
504                            target: "reth::cli",
505                            connected_peers = this.state.num_connected_peers(),
506                            stage = %stage_id,
507                            checkpoint = checkpoint.block_number,
508                            target = %OptionalField(*target),
509                            %stage_progress,
510                            "Status"
511                        )
512                    }
513                    (None, Some(stage_eta)) => {
514                        info!(
515                            target: "reth::cli",
516                            connected_peers = this.state.num_connected_peers(),
517                            stage = %stage_id,
518                            checkpoint = checkpoint.block_number,
519                            target = %OptionalField(*target),
520                            %stage_eta,
521                            "Status"
522                        )
523                    }
524                    (None, None) => {
525                        info!(
526                            target: "reth::cli",
527                            connected_peers = this.state.num_connected_peers(),
528                            stage = %stage_id,
529                            checkpoint = checkpoint.block_number,
530                            target = %OptionalField(*target),
531                            "Status"
532                        )
533                    }
534                }
535            } else {
536                let now =
537                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
538
539                // Only log status if we haven't logged recently
540                if now.saturating_sub(this.state.last_status_log_time.unwrap_or(0)) > 60 {
541                    if let Some(latest_block) = this.state.latest_block {
542                        info!(
543                            target: "reth::cli",
544                            connected_peers = this.state.num_connected_peers(),
545                            %latest_block,
546                            "Status"
547                        );
548                    } else {
549                        info!(
550                            target: "reth::cli",
551                            connected_peers = this.state.num_connected_peers(),
552                            "Status"
553                        );
554                    }
555                    this.state.last_status_log_time = Some(now);
556                }
557            }
558        }
559
560        while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
561            match event {
562                NodeEvent::Pipeline(event) => {
563                    this.state.handle_pipeline_event(event);
564                }
565                NodeEvent::ConsensusEngine(event) => {
566                    this.state.handle_consensus_engine_event(event);
567                }
568                NodeEvent::ConsensusLayerHealth(event) => {
569                    this.state.handle_consensus_layer_health_event(event)
570                }
571                NodeEvent::Pruner(event) => {
572                    this.state.handle_pruner_event(event);
573                }
574                NodeEvent::StaticFileProducer(event) => {
575                    this.state.handle_static_file_producer_event(event);
576                }
577                NodeEvent::Other(event_description) => {
578                    warn!("{event_description}");
579                }
580            }
581        }
582
583        Poll::Pending
584    }
585}
586
587/// A container calculating the estimated time that a stage will complete in, based on stage
588/// checkpoints reported by the pipeline.
589///
590/// One `Eta` is only valid for a single stage.
591#[derive(Default, Copy, Clone)]
592struct Eta {
593    /// The last stage checkpoint
594    last_checkpoint: EntitiesCheckpoint,
595    /// The last time the stage reported its checkpoint
596    last_checkpoint_time: Option<Instant>,
597    /// The current ETA
598    eta: Option<Duration>,
599}
600
601impl Eta {
602    /// Update the ETA given the checkpoint, if possible.
603    fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
604        let Some(current) = checkpoint.entities() else { return };
605
606        if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
607            let Some(processed_since_last) =
608                current.processed.checked_sub(self.last_checkpoint.processed)
609            else {
610                self.eta = None;
611                debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
612                return
613            };
614            let elapsed = last_checkpoint_time.elapsed();
615            let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
616
617            let Some(remaining) = current.total.checked_sub(current.processed) else {
618                self.eta = None;
619                debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
620                return
621            };
622
623            self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
624        }
625
626        self.last_checkpoint = current;
627        self.last_checkpoint_time = Some(Instant::now());
628    }
629
630    /// Returns `true` if the ETA is available, i.e. at least one checkpoint has been reported.
631    fn is_available(&self) -> bool {
632        self.eta.zip(self.last_checkpoint_time).is_some()
633    }
634
635    /// Format ETA for a given stage.
636    ///
637    /// NOTE: Currently ETA is enabled only for the stages that have predictable progress.
638    /// It's not the case for network-dependent ([`StageId::Headers`] and [`StageId::Bodies`]) and
639    /// [`StageId::Execution`] stages.
640    fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
641        if !self.is_available() ||
642            matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
643        {
644            None
645        } else {
646            Some(self.to_string())
647        }
648    }
649}
650
651impl Display for Eta {
652    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
653        if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
654            let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
655
656            if let Some(remaining) = remaining {
657                return write!(
658                    f,
659                    "{}",
660                    humantime::format_duration(Duration::from_secs(remaining.as_secs()))
661                        .to_string()
662                        .replace(' ', "")
663                )
664            }
665        }
666
667        write!(f, "unknown")
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674
675    #[test]
676    fn eta_display_no_milliseconds() {
677        let eta = Eta {
678            last_checkpoint_time: Some(Instant::now()),
679            eta: Some(Duration::from_millis(
680                13 * 60 * 1000 + // Minutes
681                    37 * 1000 + // Seconds
682                    999, // Milliseconds
683            )),
684            ..Default::default()
685        }
686        .to_string();
687
688        assert_eq!(eta, "13m37s");
689    }
690}