Skip to main content

reth_stages_api/pipeline/
mod.rs

1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10    providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11    ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12    PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, StorageSettingsCache,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18    pin::Pin,
19    time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29    BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30    StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37/// A container for a queued stage.
38pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40/// The future that returns the owned pipeline and the result of the pipeline run. See
41/// [`Pipeline::run_as_fut`].
42pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44/// The pipeline type itself with the result of [`Pipeline::run_as_fut`]
45pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48/// A staged sync pipeline.
49///
50/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
51/// of the chain and the pipeline then executes each stage in order from the current local chain tip
52/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
53/// tip.
54///
55/// After the entire pipeline has been run, it will run again unless asked to stop (see
56/// [`Pipeline::set_max_block`]).
57///
58/// `include_mmd!("docs/mermaid/pipeline.mmd`")
59///
60/// # Unwinding
61///
62/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
63/// pipeline will unwind the stages in reverse order of execution. It is also possible to
64/// request an unwind manually (see [`Pipeline::unwind`]).
65///
66/// # Defaults
67///
68/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
69pub struct Pipeline<N: ProviderNodeTypes> {
70    /// Provider factory.
71    provider_factory: ProviderFactory<N>,
72    /// All configured stages in the order they will be executed.
73    stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74    /// The maximum block number to sync to.
75    max_block: Option<BlockNumber>,
76    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77    /// Sender for events the pipeline emits.
78    event_sender: EventSender<PipelineEvent>,
79    /// Keeps track of the progress of the pipeline.
80    progress: PipelineProgress,
81    /// A Sender for the current chain tip to sync to.
82    ///
83    /// This is used to notify the headers stage about a new sync target.
84    tip_tx: Option<watch::Sender<B256>>,
85    metrics_tx: Option<MetricEventsSender>,
86    /// Whether an unwind should fail the syncing process. Should only be set when downloading
87    /// blocks from trusted sources and expecting them to be valid.
88    fail_on_unwind: bool,
89    /// Block that was chosen as a target of the last unwind triggered by
90    /// [`StageError::DetachedHead`] error.
91    last_detached_head_unwind_target: Option<B256>,
92    /// Number of consecutive unwind attempts due to [`StageError::DetachedHead`] for the current
93    /// fork.
94    detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98    /// Construct a pipeline using a [`PipelineBuilder`].
99    pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100    {
101        PipelineBuilder::default()
102    }
103
104    /// Return the minimum block number achieved by
105    /// any stage during the execution of the pipeline.
106    pub const fn minimum_block_number(&self) -> Option<u64> {
107        self.progress.minimum_block_number
108    }
109
110    /// Set tip for reverse sync.
111    #[track_caller]
112    pub fn set_tip(&self, tip: B256) {
113        let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114            warn!(target: "sync::pipeline", "Chain tip channel closed");
115        });
116    }
117
118    /// Listen for events on the pipeline.
119    pub fn events(&self) -> EventStream<PipelineEvent> {
120        self.event_sender.new_listener()
121    }
122
123    /// Get a mutable reference to a stage by index.
124    pub fn stage(
125        &mut self,
126        idx: usize,
127    ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128        &mut self.stages[idx]
129    }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133    /// Registers progress metrics for each registered stage
134    pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135        let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136        let provider = self.provider_factory.provider()?;
137
138        for stage in &self.stages {
139            let stage_id = stage.id();
140            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141                stage_id,
142                checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143                max_block_number: None,
144                elapsed: Duration::default(),
145            });
146        }
147        Ok(())
148    }
149
150    /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
151    /// pipeline and its result as a future.
152    #[track_caller]
153    pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154        let _ = self.register_metrics();
155        Box::pin(async move {
156            // NOTE: the tip should only be None if we are in continuous sync mode.
157            if let Some(target) = target {
158                match target {
159                    PipelineTarget::Sync(tip) => self.set_tip(tip),
160                    PipelineTarget::Unwind(target) => {
161                        if let Err(err) = self.move_to_static_files() {
162                            return (self, Err(err.into()))
163                        }
164                        if let Err(err) = self.unwind(target, None) {
165                            return (self, Err(err))
166                        }
167                        self.progress.update(target);
168
169                        return (self, Ok(ControlFlow::Continue { block_number: target }))
170                    }
171                }
172            }
173
174            let result = self.run_loop().await;
175            trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176            (self, result)
177        })
178    }
179
180    /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
181    /// a `max_block` in the pipeline.
182    pub async fn run(&mut self) -> Result<(), PipelineError> {
183        let _ = self.register_metrics(); // ignore error
184
185        loop {
186            let next_action = self.run_loop().await?;
187
188            if next_action.is_unwind() && self.fail_on_unwind {
189                return Err(PipelineError::UnexpectedUnwind)
190            }
191
192            // Terminate the loop early if it's reached the maximum user
193            // configured block.
194            if next_action.should_continue() &&
195                self.progress
196                    .minimum_block_number
197                    .zip(self.max_block)
198                    .is_some_and(|(progress, target)| progress >= target)
199            {
200                trace!(
201                    target: "sync::pipeline",
202                    ?next_action,
203                    minimum_block_number = ?self.progress.minimum_block_number,
204                    max_block = ?self.max_block,
205                    "Terminating pipeline."
206                );
207                return Ok(())
208            }
209        }
210    }
211
212    /// Performs one pass of the pipeline across all stages. After successful
213    /// execution of each stage, it proceeds to commit it to the database.
214    ///
215    /// If any stage is unsuccessful at execution, we proceed to
216    /// unwind. This will undo the progress across the entire pipeline
217    /// up to the block that caused the error.
218    ///
219    /// Returns the control flow after it ran the pipeline.
220    /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in
221    /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage
222    /// that caused the unwind.
223    pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224        self.move_to_static_files()?;
225
226        let mut previous_stage = None;
227        for stage_index in 0..self.stages.len() {
228            let stage = &self.stages[stage_index];
229            let stage_id = stage.id();
230
231            trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232            let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234            trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236            match next {
237                ControlFlow::NoProgress { block_number } => {
238                    if let Some(block_number) = block_number {
239                        self.progress.update(block_number);
240                    }
241                }
242                ControlFlow::Continue { block_number } => self.progress.update(block_number),
243                ControlFlow::Unwind { target, bad_block } => {
244                    self.unwind(target, Some(bad_block.block.number))?;
245                    return Ok(ControlFlow::Unwind { target, bad_block })
246                }
247            }
248
249            previous_stage = Some(
250                self.provider_factory
251                    .provider()?
252                    .get_stage_checkpoint(stage_id)?
253                    .unwrap_or_default()
254                    .block_number,
255            );
256        }
257
258        Ok(self.progress.next_ctrl())
259    }
260
261    /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
262    /// all data from the database to static files for corresponding
263    /// [segments](reth_static_file_types::StaticFileSegment), according to their [stage
264    /// checkpoints](StageCheckpoint):
265    /// - [`StaticFileSegment::Headers`](reth_static_file_types::StaticFileSegment::Headers) ->
266    ///   [`StageId::Headers`]
267    /// - [`StaticFileSegment::Receipts`](reth_static_file_types::StaticFileSegment::Receipts) ->
268    ///   [`StageId::Execution`]
269    /// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
270    ///   -> [`StageId::Bodies`]
271    ///
272    /// This is a legacy storage.v1 backfill step. Storage.v2 writes directly to static files and
273    /// `RocksDB`, so there is no MDBX -> static-file migration to perform.
274    ///
275    /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
276    /// lock is occupied.
277    pub fn move_to_static_files(&self) -> RethResult<()> {
278        if self.provider_factory.cached_storage_settings().is_v2() {
279            return Ok(())
280        }
281
282        // Copies data from database to static files
283        let lowest_static_file_height =
284            self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
285
286        // Deletes data which has been copied to static files.
287        if let Some(prune_tip) = lowest_static_file_height {
288            // Run the pruner so we don't potentially end up with higher height in the database vs
289            // static files during a pipeline unwind
290            let mut pruner = PrunerBuilder::new(Default::default())
291                .delete_limit(usize::MAX)
292                .build_with_provider_factory(self.provider_factory.clone());
293
294            pruner.run(prune_tip)?;
295        }
296
297        Ok(())
298    }
299
300    /// Unwind the stages to the target block (exclusive).
301    ///
302    /// If the unwind is due to a bad block the number of that block should be specified.
303    pub fn unwind(
304        &mut self,
305        to: BlockNumber,
306        bad_block: Option<BlockNumber>,
307    ) -> Result<(), PipelineError> {
308        // Add validation before starting unwind
309        let (latest_block, prune_modes, checkpoints) = {
310            let provider = self.provider_factory.provider()?;
311            (
312                provider.last_block_number()?,
313                provider.prune_modes_ref().clone(),
314                provider.get_prune_checkpoints()?,
315            )
316        };
317        prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
318
319        // Unwind stages in reverse order of execution
320        let unwind_pipeline = self.stages.iter_mut().rev();
321
322        // Legacy Engine: This prevents a race condition in which the `StaticFileProducer` could
323        // attempt to proceed with a finalized block which has been unwinded
324        let _locked_sf_producer = self.static_file_producer.lock();
325
326        let mut provider_rw =
327            self.provider_factory.unwind_provider_rw()?.disable_long_read_transaction_safety();
328
329        for stage in unwind_pipeline {
330            let stage_id = stage.id();
331            let span = info_span!("Unwinding", stage = %stage_id);
332            let _enter = span.enter();
333
334            let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
335            if checkpoint.block_number < to {
336                debug!(
337                    target: "sync::pipeline",
338                    from = %checkpoint.block_number,
339                    %to,
340                    "Unwind point too far for stage"
341                );
342                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
343
344                continue
345            }
346
347            info!(
348                target: "sync::pipeline",
349                from = %checkpoint.block_number,
350                %to,
351                ?bad_block,
352                "Starting unwind"
353            );
354            while checkpoint.block_number > to {
355                let unwind_started_at = Instant::now();
356                let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
357                self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
358
359                let output = stage.unwind(&provider_rw, input);
360                match output {
361                    Ok(unwind_output) => {
362                        checkpoint = unwind_output.checkpoint;
363                        info!(
364                            target: "sync::pipeline",
365                            stage = %stage_id,
366                            unwind_to = to,
367                            progress = checkpoint.block_number,
368                            done = checkpoint.block_number == to,
369                            "Stage unwound"
370                        );
371
372                        provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
373
374                        // Notify event listeners and update metrics.
375                        self.event_sender
376                            .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
377
378                        if let Some(metrics_tx) = &mut self.metrics_tx {
379                            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
380                                stage_id,
381                                checkpoint,
382                                // We assume it was set in the previous execute iteration, so it
383                                // doesn't change when we unwind.
384                                max_block_number: None,
385                                elapsed: unwind_started_at.elapsed(),
386                            });
387                        }
388
389                        // update finalized and safe block if needed
390                        let last_saved_finalized_block_number =
391                            provider_rw.last_finalized_block_number()?;
392
393                        // If None, that means the finalized block is not written so we should
394                        // always save in that case
395                        if last_saved_finalized_block_number.is_none() ||
396                            Some(checkpoint.block_number) < last_saved_finalized_block_number
397                        {
398                            provider_rw.save_finalized_block_number(BlockNumber::from(
399                                checkpoint.block_number,
400                            ))?;
401                        }
402
403                        let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
404
405                        if last_saved_safe_block_number.is_none() ||
406                            Some(checkpoint.block_number) < last_saved_safe_block_number
407                        {
408                            provider_rw.save_safe_block_number(BlockNumber::from(
409                                checkpoint.block_number,
410                            ))?;
411                        }
412
413                        provider_rw.commit()?;
414
415                        stage.post_unwind_commit()?;
416
417                        provider_rw = self.provider_factory.unwind_provider_rw()?;
418                    }
419                    Err(err) => {
420                        self.event_sender.notify(PipelineEvent::Error { stage_id });
421
422                        return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
423                    }
424                }
425            }
426        }
427
428        Ok(())
429    }
430
431    async fn execute_stage_to_completion(
432        &mut self,
433        previous_stage: Option<BlockNumber>,
434        stage_index: usize,
435    ) -> Result<ControlFlow, PipelineError> {
436        let total_stages = self.stages.len();
437
438        let stage_id = self.stage(stage_index).id();
439        let mut made_progress = false;
440        let target = self.max_block.or(previous_stage);
441
442        loop {
443            let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
444
445            let stage_reached_max_block = prev_checkpoint
446                .zip(self.max_block)
447                .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
448            if stage_reached_max_block {
449                warn!(
450                    target: "sync::pipeline",
451                    stage = %stage_id,
452                    max_block = self.max_block,
453                    prev_block = prev_checkpoint.map(|progress| progress.block_number),
454                    "Stage reached target block, skipping."
455                );
456                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
457
458                // We reached the maximum block, so we skip the stage
459                return Ok(ControlFlow::NoProgress {
460                    block_number: prev_checkpoint.map(|progress| progress.block_number),
461                })
462            }
463
464            let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
465
466            self.event_sender.notify(PipelineEvent::Prepare {
467                pipeline_stages_progress: PipelineStagesProgress {
468                    current: stage_index + 1,
469                    total: total_stages,
470                },
471                stage_id,
472                checkpoint: prev_checkpoint,
473                target,
474            });
475
476            if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
477                self.event_sender.notify(PipelineEvent::Error { stage_id });
478                match self.on_stage_error(stage_id, prev_checkpoint, err)? {
479                    Some(ctrl) => return Ok(ctrl),
480                    None => continue,
481                };
482            }
483
484            let stage_started_at = Instant::now();
485            let provider_rw = self.provider_factory.database_provider_rw()?;
486
487            self.event_sender.notify(PipelineEvent::Run {
488                pipeline_stages_progress: PipelineStagesProgress {
489                    current: stage_index + 1,
490                    total: total_stages,
491                },
492                stage_id,
493                checkpoint: prev_checkpoint,
494                target,
495            });
496
497            match self.stage(stage_index).execute(&provider_rw, exec_input) {
498                Ok(out @ ExecOutput { checkpoint, done }) => {
499                    // Update stage checkpoint.
500                    provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
501
502                    // Commit processed data to the database.
503                    provider_rw.commit()?;
504
505                    // Invoke stage post commit hook.
506                    self.stage(stage_index).post_execute_commit()?;
507
508                    // Notify event listeners and update metrics.
509                    self.event_sender.notify(PipelineEvent::Ran {
510                        pipeline_stages_progress: PipelineStagesProgress {
511                            current: stage_index + 1,
512                            total: total_stages,
513                        },
514                        stage_id,
515                        result: out.clone(),
516                    });
517                    if let Some(metrics_tx) = &mut self.metrics_tx {
518                        let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
519                            stage_id,
520                            checkpoint,
521                            max_block_number: target,
522                            elapsed: stage_started_at.elapsed(),
523                        });
524                    }
525
526                    let block_number = checkpoint.block_number;
527                    let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
528                    made_progress |= block_number != prev_block_number;
529                    if done {
530                        return Ok(if made_progress {
531                            ControlFlow::Continue { block_number }
532                        } else {
533                            ControlFlow::NoProgress { block_number: Some(block_number) }
534                        })
535                    }
536                }
537                Err(err) => {
538                    drop(provider_rw);
539                    self.event_sender.notify(PipelineEvent::Error { stage_id });
540
541                    if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
542                        return Ok(ctrl)
543                    }
544                }
545            }
546        }
547    }
548
549    fn on_stage_error(
550        &mut self,
551        stage_id: StageId,
552        prev_checkpoint: Option<StageCheckpoint>,
553        err: StageError,
554    ) -> Result<Option<ControlFlow>, PipelineError> {
555        if let StageError::DetachedHead { local_head, header, error } = err {
556            warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
557
558            if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
559                if local_head.block.hash == last_detached_head_unwind_target &&
560                    header.block.number == local_head.block.number + 1
561                {
562                    self.detached_head_attempts += 1;
563                } else {
564                    self.detached_head_attempts = 1;
565                }
566            } else {
567                self.detached_head_attempts = 1;
568            }
569
570            // We unwind because of a detached head.
571            let unwind_to = local_head
572                .block
573                .number
574                .saturating_sub(
575                    BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
576                )
577                .max(1);
578
579            self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
580            Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
581        } else if let StageError::Block { block, error } = err {
582            match error {
583                BlockErrorKind::Validation(validation_error) => {
584                    error!(
585                        target: "sync::pipeline",
586                        stage = %stage_id,
587                        bad_block = %block.block.number,
588                        "Stage encountered a validation error: {validation_error}"
589                    );
590
591                    // FIXME: When handling errors, we do not commit the database transaction. This
592                    // leads to the Merkle stage not clearing its checkpoint, and restarting from an
593                    // invalid place.
594                    // Only reset MerkleExecute checkpoint if MerkleExecute itself failed
595                    if stage_id == StageId::MerkleExecute {
596                        let provider_rw = self.provider_factory.database_provider_rw()?;
597                        provider_rw
598                            .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
599                        provider_rw.save_stage_checkpoint(
600                            StageId::MerkleExecute,
601                            prev_checkpoint.unwrap_or_default(),
602                        )?;
603
604                        provider_rw.commit()?;
605                    }
606
607                    // We unwind because of a validation error. If the unwind itself
608                    // fails, we bail entirely,
609                    // otherwise we restart the execution loop from the
610                    // beginning.
611                    Ok(Some(ControlFlow::Unwind {
612                        target: prev_checkpoint.unwrap_or_default().block_number,
613                        bad_block: block,
614                    }))
615                }
616                BlockErrorKind::Execution(execution_error) => {
617                    error!(
618                        target: "sync::pipeline",
619                        stage = %stage_id,
620                        bad_block = %block.block.number,
621                        "Stage encountered an execution error: {execution_error}"
622                    );
623
624                    // We unwind because of an execution error. If the unwind itself
625                    // fails, we bail entirely,
626                    // otherwise we restart
627                    // the execution loop from the beginning.
628                    Ok(Some(ControlFlow::Unwind {
629                        target: prev_checkpoint.unwrap_or_default().block_number,
630                        bad_block: block,
631                    }))
632                }
633            }
634        } else if let StageError::MissingStaticFileData { block, segment } = err {
635            error!(
636                target: "sync::pipeline",
637                stage = %stage_id,
638                bad_block = %block.block.number,
639                segment = %segment,
640                "Stage is missing static file data."
641            );
642
643            Ok(Some(ControlFlow::Unwind {
644                target: block.block.number.saturating_sub(1),
645                bad_block: block,
646            }))
647        } else if err.is_fatal() {
648            error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
649            Err(err.into())
650        } else {
651            // On other errors we assume they are recoverable if we discard the
652            // transaction and run the stage again.
653            warn!(
654                target: "sync::pipeline",
655                stage = %stage_id,
656                "Stage encountered a non-fatal error: {err}. Retrying..."
657            );
658            Ok(None)
659        }
660    }
661}
662
663impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
664    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665        f.debug_struct("Pipeline")
666            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
667            .field("max_block", &self.max_block)
668            .field("event_sender", &self.event_sender)
669            .field("fail_on_unwind", &self.fail_on_unwind)
670            .finish()
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use std::sync::atomic::Ordering;
677
678    use super::*;
679    use crate::{test_utils::TestStage, UnwindOutput};
680    use assert_matches::assert_matches;
681    use reth_consensus::ConsensusError;
682    use reth_errors::ProviderError;
683    use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
684    use reth_prune::PruneModes;
685    use reth_testing_utils::generators::{self, random_block_with_parent};
686    use tokio_stream::StreamExt;
687
688    #[test]
689    fn record_progress_calculates_outliers() {
690        let mut progress = PipelineProgress::default();
691
692        progress.update(10);
693        assert_eq!(progress.minimum_block_number, Some(10));
694        assert_eq!(progress.maximum_block_number, Some(10));
695
696        progress.update(20);
697        assert_eq!(progress.minimum_block_number, Some(10));
698        assert_eq!(progress.maximum_block_number, Some(20));
699
700        progress.update(1);
701        assert_eq!(progress.minimum_block_number, Some(1));
702        assert_eq!(progress.maximum_block_number, Some(20));
703    }
704
705    #[test]
706    fn progress_ctrl_flow() {
707        let mut progress = PipelineProgress::default();
708
709        assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
710
711        progress.update(1);
712        assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
713    }
714
715    /// Runs a simple pipeline.
716    #[tokio::test]
717    async fn run_pipeline() {
718        let provider_factory = create_test_provider_factory();
719
720        let stage_a = TestStage::new(StageId::Other("A"))
721            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
722        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
723        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
724
725        let stage_b = TestStage::new(StageId::Other("B"))
726            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
727        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
728        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
729
730        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
731            .add_stage(stage_a)
732            .add_stage(stage_b)
733            .with_max_block(10)
734            .build(
735                provider_factory.clone(),
736                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
737            );
738        let events = pipeline.events();
739
740        // Run pipeline
741        tokio::spawn(async move {
742            pipeline.run().await.unwrap();
743        });
744
745        // Check that the stages were run in order
746        assert_eq!(
747            events.collect::<Vec<PipelineEvent>>().await,
748            vec![
749                PipelineEvent::Prepare {
750                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
751                    stage_id: StageId::Other("A"),
752                    checkpoint: None,
753                    target: Some(10),
754                },
755                PipelineEvent::Run {
756                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
757                    stage_id: StageId::Other("A"),
758                    checkpoint: None,
759                    target: Some(10),
760                },
761                PipelineEvent::Ran {
762                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
763                    stage_id: StageId::Other("A"),
764                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
765                },
766                PipelineEvent::Prepare {
767                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
768                    stage_id: StageId::Other("B"),
769                    checkpoint: None,
770                    target: Some(10),
771                },
772                PipelineEvent::Run {
773                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
774                    stage_id: StageId::Other("B"),
775                    checkpoint: None,
776                    target: Some(10),
777                },
778                PipelineEvent::Ran {
779                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
780                    stage_id: StageId::Other("B"),
781                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
782                },
783            ]
784        );
785
786        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
787        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
788
789        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
790        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
791    }
792
793    /// Unwinds a simple pipeline.
794    #[tokio::test]
795    async fn unwind_pipeline() {
796        let provider_factory = create_test_provider_factory();
797
798        let stage_a = TestStage::new(StageId::Other("A"))
799            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
800            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
801        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
802        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
803
804        let stage_b = TestStage::new(StageId::Other("B"))
805            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
806            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
807        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
808        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
809
810        let stage_c = TestStage::new(StageId::Other("C"))
811            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
812            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
813        let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
814        let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
815
816        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
817            .add_stage(stage_a)
818            .add_stage(stage_b)
819            .add_stage(stage_c)
820            .with_max_block(10)
821            .build(
822                provider_factory.clone(),
823                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
824            );
825        let events = pipeline.events();
826
827        // Run pipeline
828        tokio::spawn(async move {
829            // Sync first
830            pipeline.run().await.expect("Could not run pipeline");
831
832            // Unwind
833            pipeline.unwind(1, None).expect("Could not unwind pipeline");
834        });
835
836        // Check that the stages were unwound in reverse order
837        assert_eq!(
838            events.collect::<Vec<PipelineEvent>>().await,
839            vec![
840                // Executing
841                PipelineEvent::Prepare {
842                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
843                    stage_id: StageId::Other("A"),
844                    checkpoint: None,
845                    target: Some(10),
846                },
847                PipelineEvent::Run {
848                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
849                    stage_id: StageId::Other("A"),
850                    checkpoint: None,
851                    target: Some(10),
852                },
853                PipelineEvent::Ran {
854                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
855                    stage_id: StageId::Other("A"),
856                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
857                },
858                PipelineEvent::Prepare {
859                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
860                    stage_id: StageId::Other("B"),
861                    checkpoint: None,
862                    target: Some(10),
863                },
864                PipelineEvent::Run {
865                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
866                    stage_id: StageId::Other("B"),
867                    checkpoint: None,
868                    target: Some(10),
869                },
870                PipelineEvent::Ran {
871                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
872                    stage_id: StageId::Other("B"),
873                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
874                },
875                PipelineEvent::Prepare {
876                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
877                    stage_id: StageId::Other("C"),
878                    checkpoint: None,
879                    target: Some(10),
880                },
881                PipelineEvent::Run {
882                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
883                    stage_id: StageId::Other("C"),
884                    checkpoint: None,
885                    target: Some(10),
886                },
887                PipelineEvent::Ran {
888                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
889                    stage_id: StageId::Other("C"),
890                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
891                },
892                // Unwinding
893                PipelineEvent::Unwind {
894                    stage_id: StageId::Other("C"),
895                    input: UnwindInput {
896                        checkpoint: StageCheckpoint::new(20),
897                        unwind_to: 1,
898                        bad_block: None
899                    }
900                },
901                PipelineEvent::Unwound {
902                    stage_id: StageId::Other("C"),
903                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
904                },
905                PipelineEvent::Unwind {
906                    stage_id: StageId::Other("B"),
907                    input: UnwindInput {
908                        checkpoint: StageCheckpoint::new(10),
909                        unwind_to: 1,
910                        bad_block: None
911                    }
912                },
913                PipelineEvent::Unwound {
914                    stage_id: StageId::Other("B"),
915                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
916                },
917                PipelineEvent::Unwind {
918                    stage_id: StageId::Other("A"),
919                    input: UnwindInput {
920                        checkpoint: StageCheckpoint::new(100),
921                        unwind_to: 1,
922                        bad_block: None
923                    }
924                },
925                PipelineEvent::Unwound {
926                    stage_id: StageId::Other("A"),
927                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
928                },
929            ]
930        );
931
932        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
933        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
934
935        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
936        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
937
938        assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
939        assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
940    }
941
942    /// Unwinds a pipeline with intermediate progress.
943    #[tokio::test]
944    async fn unwind_pipeline_with_intermediate_progress() {
945        let provider_factory = create_test_provider_factory();
946
947        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
948            .add_stage(
949                TestStage::new(StageId::Other("A"))
950                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
951                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
952            )
953            .add_stage(
954                TestStage::new(StageId::Other("B"))
955                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
956            )
957            .with_max_block(10)
958            .build(
959                provider_factory.clone(),
960                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
961            );
962        let events = pipeline.events();
963
964        // Run pipeline
965        tokio::spawn(async move {
966            // Sync first
967            pipeline.run().await.expect("Could not run pipeline");
968
969            // Unwind
970            pipeline.unwind(50, None).expect("Could not unwind pipeline");
971        });
972
973        // Check that the stages were unwound in reverse order
974        assert_eq!(
975            events.collect::<Vec<PipelineEvent>>().await,
976            vec![
977                // Executing
978                PipelineEvent::Prepare {
979                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
980                    stage_id: StageId::Other("A"),
981                    checkpoint: None,
982                    target: Some(10),
983                },
984                PipelineEvent::Run {
985                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
986                    stage_id: StageId::Other("A"),
987                    checkpoint: None,
988                    target: Some(10),
989                },
990                PipelineEvent::Ran {
991                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
992                    stage_id: StageId::Other("A"),
993                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
994                },
995                PipelineEvent::Prepare {
996                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
997                    stage_id: StageId::Other("B"),
998                    checkpoint: None,
999                    target: Some(10),
1000                },
1001                PipelineEvent::Run {
1002                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1003                    stage_id: StageId::Other("B"),
1004                    checkpoint: None,
1005                    target: Some(10),
1006                },
1007                PipelineEvent::Ran {
1008                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1009                    stage_id: StageId::Other("B"),
1010                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1011                },
1012                // Unwinding
1013                // Nothing to unwind in stage "B"
1014                PipelineEvent::Skipped { stage_id: StageId::Other("B") },
1015                PipelineEvent::Unwind {
1016                    stage_id: StageId::Other("A"),
1017                    input: UnwindInput {
1018                        checkpoint: StageCheckpoint::new(100),
1019                        unwind_to: 50,
1020                        bad_block: None
1021                    }
1022                },
1023                PipelineEvent::Unwound {
1024                    stage_id: StageId::Other("A"),
1025                    result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1026                },
1027            ]
1028        );
1029    }
1030
1031    /// Runs a pipeline that unwinds during sync.
1032    ///
1033    /// The flow is:
1034    ///
1035    /// - Stage A syncs to block 10
1036    /// - Stage B triggers an unwind, marking block 5 as bad
1037    /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is
1038    ///   skipped entirely (there is nothing to unwind)
1039    /// - Stage A unwinds to its previous progress, block 0
1040    /// - Stage A syncs back up to block 10
1041    /// - Stage B syncs to block 10
1042    /// - The pipeline finishes
1043    #[tokio::test]
1044    async fn run_pipeline_with_unwind() {
1045        let provider_factory = create_test_provider_factory();
1046
1047        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1048            .add_stage(
1049                TestStage::new(StageId::Other("A"))
1050                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1051                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1052                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1053            )
1054            .add_stage(
1055                TestStage::new(StageId::Other("B"))
1056                    .add_exec(Err(StageError::Block {
1057                        block: Box::new(random_block_with_parent(
1058                            &mut generators::rng(),
1059                            5,
1060                            Default::default(),
1061                        )),
1062                        error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1063                    }))
1064                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1065                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1066            )
1067            .with_max_block(10)
1068            .build(
1069                provider_factory.clone(),
1070                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1071            );
1072        let events = pipeline.events();
1073
1074        // Run pipeline
1075        tokio::spawn(async move {
1076            pipeline.run().await.expect("Could not run pipeline");
1077        });
1078
1079        // Check that the stages were unwound in reverse order
1080        assert_eq!(
1081            events.collect::<Vec<PipelineEvent>>().await,
1082            vec![
1083                PipelineEvent::Prepare {
1084                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1085                    stage_id: StageId::Other("A"),
1086                    checkpoint: None,
1087                    target: Some(10),
1088                },
1089                PipelineEvent::Run {
1090                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1091                    stage_id: StageId::Other("A"),
1092                    checkpoint: None,
1093                    target: Some(10),
1094                },
1095                PipelineEvent::Ran {
1096                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1097                    stage_id: StageId::Other("A"),
1098                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1099                },
1100                PipelineEvent::Prepare {
1101                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1102                    stage_id: StageId::Other("B"),
1103                    checkpoint: None,
1104                    target: Some(10),
1105                },
1106                PipelineEvent::Run {
1107                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1108                    stage_id: StageId::Other("B"),
1109                    checkpoint: None,
1110                    target: Some(10),
1111                },
1112                PipelineEvent::Error { stage_id: StageId::Other("B") },
1113                PipelineEvent::Unwind {
1114                    stage_id: StageId::Other("A"),
1115                    input: UnwindInput {
1116                        checkpoint: StageCheckpoint::new(10),
1117                        unwind_to: 0,
1118                        bad_block: Some(5)
1119                    }
1120                },
1121                PipelineEvent::Unwound {
1122                    stage_id: StageId::Other("A"),
1123                    result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1124                },
1125                PipelineEvent::Prepare {
1126                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1127                    stage_id: StageId::Other("A"),
1128                    checkpoint: Some(StageCheckpoint::new(0)),
1129                    target: Some(10),
1130                },
1131                PipelineEvent::Run {
1132                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1133                    stage_id: StageId::Other("A"),
1134                    checkpoint: Some(StageCheckpoint::new(0)),
1135                    target: Some(10),
1136                },
1137                PipelineEvent::Ran {
1138                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1139                    stage_id: StageId::Other("A"),
1140                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1141                },
1142                PipelineEvent::Prepare {
1143                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1144                    stage_id: StageId::Other("B"),
1145                    checkpoint: None,
1146                    target: Some(10),
1147                },
1148                PipelineEvent::Run {
1149                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1150                    stage_id: StageId::Other("B"),
1151                    checkpoint: None,
1152                    target: Some(10),
1153                },
1154                PipelineEvent::Ran {
1155                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1156                    stage_id: StageId::Other("B"),
1157                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1158                },
1159            ]
1160        );
1161    }
1162
1163    /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
1164    #[tokio::test]
1165    async fn pipeline_error_handling() {
1166        // Non-fatal
1167        let provider_factory = create_test_provider_factory();
1168        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1169            .add_stage(
1170                TestStage::new(StageId::Other("NonFatal"))
1171                    .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1172                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1173            )
1174            .with_max_block(10)
1175            .build(
1176                provider_factory.clone(),
1177                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1178            );
1179        let result = pipeline.run().await;
1180        assert_matches!(result, Ok(()));
1181
1182        // Fatal
1183        let provider_factory = create_test_provider_factory();
1184        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1185            .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1186                StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1187            )))
1188            .build(
1189                provider_factory.clone(),
1190                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1191            );
1192        let result = pipeline.run().await;
1193        assert_matches!(
1194            result,
1195            Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1196                ProviderError::BlockBodyIndicesNotFound(5)
1197            )))
1198        );
1199    }
1200}