reth_stages_api/pipeline/
mod.rs

1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
11    ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
12    StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::pin::Pin;
18use tokio::sync::watch;
19use tracing::*;
20
21mod builder;
22mod progress;
23mod set;
24
25use crate::{
26    BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
27    StageError, StageExt, UnwindInput,
28};
29pub use builder::*;
30use progress::*;
31use reth_errors::RethResult;
32pub use set::*;
33
34/// A container for a queued stage.
35pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
36
37/// The future that returns the owned pipeline and the result of the pipeline run. See
38/// [`Pipeline::run_as_fut`].
39pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
40
41/// The pipeline type itself with the result of [`Pipeline::run_as_fut`]
42pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
43
44#[cfg_attr(doc, aquamarine::aquamarine)]
45/// A staged sync pipeline.
46///
47/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
48/// of the chain and the pipeline then executes each stage in order from the current local chain tip
49/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
50/// tip.
51///
52/// After the entire pipeline has been run, it will run again unless asked to stop (see
53/// [`Pipeline::set_max_block`]).
54///
55/// `include_mmd!("docs/mermaid/pipeline.mmd`")
56///
57/// # Unwinding
58///
59/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
60/// pipeline will unwind the stages in reverse order of execution. It is also possible to
61/// request an unwind manually (see [`Pipeline::unwind`]).
62///
63/// # Defaults
64///
65/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
66pub struct Pipeline<N: ProviderNodeTypes> {
67    /// Provider factory.
68    provider_factory: ProviderFactory<N>,
69    /// All configured stages in the order they will be executed.
70    stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
71    /// The maximum block number to sync to.
72    max_block: Option<BlockNumber>,
73    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
74    /// Sender for events the pipeline emits.
75    event_sender: EventSender<PipelineEvent>,
76    /// Keeps track of the progress of the pipeline.
77    progress: PipelineProgress,
78    /// A Sender for the current chain tip to sync to.
79    ///
80    /// This is used to notify the headers stage about a new sync target.
81    tip_tx: Option<watch::Sender<B256>>,
82    metrics_tx: Option<MetricEventsSender>,
83    /// Whether an unwind should fail the syncing process. Should only be set when downloading
84    /// blocks from trusted sources and expecting them to be valid.
85    fail_on_unwind: bool,
86}
87
88impl<N: ProviderNodeTypes> Pipeline<N> {
89    /// Construct a pipeline using a [`PipelineBuilder`].
90    pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
91    {
92        PipelineBuilder::default()
93    }
94
95    /// Return the minimum block number achieved by
96    /// any stage during the execution of the pipeline.
97    pub const fn minimum_block_number(&self) -> Option<u64> {
98        self.progress.minimum_block_number
99    }
100
101    /// Set tip for reverse sync.
102    #[track_caller]
103    pub fn set_tip(&self, tip: B256) {
104        let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
105            warn!(target: "sync::pipeline", "Chain tip channel closed");
106        });
107    }
108
109    /// Listen for events on the pipeline.
110    pub fn events(&self) -> EventStream<PipelineEvent> {
111        self.event_sender.new_listener()
112    }
113}
114
115impl<N: ProviderNodeTypes> Pipeline<N> {
116    /// Registers progress metrics for each registered stage
117    pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
118        let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
119        let provider = self.provider_factory.provider()?;
120
121        for stage in &self.stages {
122            let stage_id = stage.id();
123            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
124                stage_id,
125                checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
126                max_block_number: None,
127            });
128        }
129        Ok(())
130    }
131
132    /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
133    /// pipeline and its result as a future.
134    #[track_caller]
135    pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
136        let _ = self.register_metrics();
137        Box::pin(async move {
138            // NOTE: the tip should only be None if we are in continuous sync mode.
139            if let Some(target) = target {
140                match target {
141                    PipelineTarget::Sync(tip) => self.set_tip(tip),
142                    PipelineTarget::Unwind(target) => {
143                        if let Err(err) = self.move_to_static_files() {
144                            return (self, Err(err.into()))
145                        }
146                        if let Err(err) = self.unwind(target, None) {
147                            return (self, Err(err))
148                        }
149                        self.progress.update(target);
150
151                        return (self, Ok(ControlFlow::Continue { block_number: target }))
152                    }
153                }
154            }
155
156            let result = self.run_loop().await;
157            trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
158            (self, result)
159        })
160    }
161
162    /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
163    /// a `max_block` in the pipeline.
164    pub async fn run(&mut self) -> Result<(), PipelineError> {
165        let _ = self.register_metrics(); // ignore error
166
167        loop {
168            let next_action = self.run_loop().await?;
169
170            if next_action.is_unwind() && self.fail_on_unwind {
171                return Err(PipelineError::UnexpectedUnwind)
172            }
173
174            // Terminate the loop early if it's reached the maximum user
175            // configured block.
176            if next_action.should_continue() &&
177                self.progress
178                    .minimum_block_number
179                    .zip(self.max_block)
180                    .is_some_and(|(progress, target)| progress >= target)
181            {
182                trace!(
183                    target: "sync::pipeline",
184                    ?next_action,
185                    minimum_block_number = ?self.progress.minimum_block_number,
186                    max_block = ?self.max_block,
187                    "Terminating pipeline."
188                );
189                return Ok(())
190            }
191        }
192    }
193
194    /// Performs one pass of the pipeline across all stages. After successful
195    /// execution of each stage, it proceeds to commit it to the database.
196    ///
197    /// If any stage is unsuccessful at execution, we proceed to
198    /// unwind. This will undo the progress across the entire pipeline
199    /// up to the block that caused the error.
200    ///
201    /// Returns the control flow after it ran the pipeline.
202    /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in
203    /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage
204    /// that caused the unwind.
205    pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
206        self.move_to_static_files()?;
207
208        let mut previous_stage = None;
209        for stage_index in 0..self.stages.len() {
210            let stage = &self.stages[stage_index];
211            let stage_id = stage.id();
212
213            trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
214            let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
215
216            trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
217
218            match next {
219                ControlFlow::NoProgress { block_number } => {
220                    if let Some(block_number) = block_number {
221                        self.progress.update(block_number);
222                    }
223                }
224                ControlFlow::Continue { block_number } => self.progress.update(block_number),
225                ControlFlow::Unwind { target, bad_block } => {
226                    self.unwind(target, Some(bad_block.block.number))?;
227                    return Ok(ControlFlow::Unwind { target, bad_block })
228                }
229            }
230
231            previous_stage = Some(
232                self.provider_factory
233                    .provider()?
234                    .get_stage_checkpoint(stage_id)?
235                    .unwrap_or_default()
236                    .block_number,
237            );
238        }
239
240        Ok(self.progress.next_ctrl())
241    }
242
243    /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
244    /// all data from the database to static files for corresponding
245    /// [segments](reth_static_file_types::StaticFileSegment), according to their [stage
246    /// checkpoints](StageCheckpoint):
247    /// - [`StaticFileSegment::Headers`](reth_static_file_types::StaticFileSegment::Headers) ->
248    ///   [`StageId::Headers`]
249    /// - [`StaticFileSegment::Receipts`](reth_static_file_types::StaticFileSegment::Receipts) ->
250    ///   [`StageId::Execution`]
251    /// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
252    ///   -> [`StageId::Bodies`]
253    ///
254    /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
255    /// lock is occupied.
256    pub fn move_to_static_files(&self) -> RethResult<()> {
257        // Copies data from database to static files
258        let lowest_static_file_height =
259            self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
260
261        // Deletes data which has been copied to static files.
262        if let Some(prune_tip) = lowest_static_file_height {
263            // Run the pruner so we don't potentially end up with higher height in the database vs
264            // static files during a pipeline unwind
265            let mut pruner = PrunerBuilder::new(Default::default())
266                .delete_limit(usize::MAX)
267                .build_with_provider_factory(self.provider_factory.clone());
268
269            pruner.run(prune_tip)?;
270        }
271
272        Ok(())
273    }
274
275    /// Unwind the stages to the target block (exclusive).
276    ///
277    /// If the unwind is due to a bad block the number of that block should be specified.
278    pub fn unwind(
279        &mut self,
280        to: BlockNumber,
281        bad_block: Option<BlockNumber>,
282    ) -> Result<(), PipelineError> {
283        // Unwind stages in reverse order of execution
284        let unwind_pipeline = self.stages.iter_mut().rev();
285
286        // Legacy Engine: This prevents a race condition in which the `StaticFileProducer` could
287        // attempt to proceed with a finalized block which has been unwinded
288        let _locked_sf_producer = self.static_file_producer.lock();
289
290        let mut provider_rw = self.provider_factory.database_provider_rw()?;
291
292        for stage in unwind_pipeline {
293            let stage_id = stage.id();
294            let span = info_span!("Unwinding", stage = %stage_id);
295            let _enter = span.enter();
296
297            let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
298            if checkpoint.block_number < to {
299                debug!(
300                    target: "sync::pipeline",
301                    from = %checkpoint.block_number,
302                    %to,
303                    "Unwind point too far for stage"
304                );
305                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
306
307                continue
308            }
309
310            info!(
311                target: "sync::pipeline",
312                from = %checkpoint.block_number,
313                %to,
314                ?bad_block,
315                "Starting unwind"
316            );
317            while checkpoint.block_number > to {
318                let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
319                self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
320
321                let output = stage.unwind(&provider_rw, input);
322                match output {
323                    Ok(unwind_output) => {
324                        checkpoint = unwind_output.checkpoint;
325                        info!(
326                            target: "sync::pipeline",
327                            stage = %stage_id,
328                            unwind_to = to,
329                            progress = checkpoint.block_number,
330                            done = checkpoint.block_number == to,
331                            "Stage unwound"
332                        );
333                        if let Some(metrics_tx) = &mut self.metrics_tx {
334                            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
335                                stage_id,
336                                checkpoint,
337                                // We assume it was set in the previous execute iteration, so it
338                                // doesn't change when we unwind.
339                                max_block_number: None,
340                            });
341                        }
342                        provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
343
344                        self.event_sender
345                            .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
346
347                        // update finalized block if needed
348                        let last_saved_finalized_block_number =
349                            provider_rw.last_finalized_block_number()?;
350
351                        // If None, that means the finalized block is not written so we should
352                        // always save in that case
353                        if last_saved_finalized_block_number.is_none() ||
354                            Some(checkpoint.block_number) < last_saved_finalized_block_number
355                        {
356                            provider_rw.save_finalized_block_number(BlockNumber::from(
357                                checkpoint.block_number,
358                            ))?;
359                        }
360
361                        UnifiedStorageWriter::commit_unwind(provider_rw)?;
362
363                        stage.post_unwind_commit()?;
364
365                        provider_rw = self.provider_factory.database_provider_rw()?;
366                    }
367                    Err(err) => {
368                        self.event_sender.notify(PipelineEvent::Error { stage_id });
369
370                        return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
371                    }
372                }
373            }
374        }
375
376        Ok(())
377    }
378
379    async fn execute_stage_to_completion(
380        &mut self,
381        previous_stage: Option<BlockNumber>,
382        stage_index: usize,
383    ) -> Result<ControlFlow, PipelineError> {
384        let total_stages = self.stages.len();
385
386        let stage = &mut self.stages[stage_index];
387        let stage_id = stage.id();
388        let mut made_progress = false;
389        let target = self.max_block.or(previous_stage);
390
391        loop {
392            let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
393
394            let stage_reached_max_block = prev_checkpoint
395                .zip(self.max_block)
396                .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
397            if stage_reached_max_block {
398                warn!(
399                    target: "sync::pipeline",
400                    stage = %stage_id,
401                    max_block = self.max_block,
402                    prev_block = prev_checkpoint.map(|progress| progress.block_number),
403                    "Stage reached target block, skipping."
404                );
405                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
406
407                // We reached the maximum block, so we skip the stage
408                return Ok(ControlFlow::NoProgress {
409                    block_number: prev_checkpoint.map(|progress| progress.block_number),
410                })
411            }
412
413            let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
414
415            self.event_sender.notify(PipelineEvent::Prepare {
416                pipeline_stages_progress: PipelineStagesProgress {
417                    current: stage_index + 1,
418                    total: total_stages,
419                },
420                stage_id,
421                checkpoint: prev_checkpoint,
422                target,
423            });
424
425            if let Err(err) = stage.execute_ready(exec_input).await {
426                self.event_sender.notify(PipelineEvent::Error { stage_id });
427
428                match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
429                    Some(ctrl) => return Ok(ctrl),
430                    None => continue,
431                };
432            }
433
434            let provider_rw = self.provider_factory.database_provider_rw()?;
435
436            self.event_sender.notify(PipelineEvent::Run {
437                pipeline_stages_progress: PipelineStagesProgress {
438                    current: stage_index + 1,
439                    total: total_stages,
440                },
441                stage_id,
442                checkpoint: prev_checkpoint,
443                target,
444            });
445
446            match stage.execute(&provider_rw, exec_input) {
447                Ok(out @ ExecOutput { checkpoint, done }) => {
448                    made_progress |=
449                        checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
450
451                    if let Some(metrics_tx) = &mut self.metrics_tx {
452                        let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
453                            stage_id,
454                            checkpoint,
455                            max_block_number: target,
456                        });
457                    }
458                    provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
459
460                    self.event_sender.notify(PipelineEvent::Ran {
461                        pipeline_stages_progress: PipelineStagesProgress {
462                            current: stage_index + 1,
463                            total: total_stages,
464                        },
465                        stage_id,
466                        result: out.clone(),
467                    });
468
469                    UnifiedStorageWriter::commit(provider_rw)?;
470
471                    stage.post_execute_commit()?;
472
473                    if done {
474                        let block_number = checkpoint.block_number;
475                        return Ok(if made_progress {
476                            ControlFlow::Continue { block_number }
477                        } else {
478                            ControlFlow::NoProgress { block_number: Some(block_number) }
479                        })
480                    }
481                }
482                Err(err) => {
483                    drop(provider_rw);
484                    self.event_sender.notify(PipelineEvent::Error { stage_id });
485
486                    if let Some(ctrl) =
487                        on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
488                    {
489                        return Ok(ctrl)
490                    }
491                }
492            }
493        }
494    }
495}
496
497fn on_stage_error<N: ProviderNodeTypes>(
498    factory: &ProviderFactory<N>,
499    stage_id: StageId,
500    prev_checkpoint: Option<StageCheckpoint>,
501    err: StageError,
502) -> Result<Option<ControlFlow>, PipelineError> {
503    if let StageError::DetachedHead { local_head, header, error } = err {
504        warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
505
506        // We unwind because of a detached head.
507        let unwind_to =
508            local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
509        Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
510    } else if let StageError::Block { block, error } = err {
511        match error {
512            BlockErrorKind::Validation(validation_error) => {
513                error!(
514                    target: "sync::pipeline",
515                    stage = %stage_id,
516                    bad_block = %block.block.number,
517                    "Stage encountered a validation error: {validation_error}"
518                );
519
520                // FIXME: When handling errors, we do not commit the database transaction. This
521                // leads to the Merkle stage not clearing its checkpoint, and restarting from an
522                // invalid place.
523                let provider_rw = factory.database_provider_rw()?;
524                provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
525                provider_rw.save_stage_checkpoint(
526                    StageId::MerkleExecute,
527                    prev_checkpoint.unwrap_or_default(),
528                )?;
529
530                UnifiedStorageWriter::commit(provider_rw)?;
531
532                // We unwind because of a validation error. If the unwind itself
533                // fails, we bail entirely,
534                // otherwise we restart the execution loop from the
535                // beginning.
536                Ok(Some(ControlFlow::Unwind {
537                    target: prev_checkpoint.unwrap_or_default().block_number,
538                    bad_block: block,
539                }))
540            }
541            BlockErrorKind::Execution(execution_error) => {
542                error!(
543                    target: "sync::pipeline",
544                    stage = %stage_id,
545                    bad_block = %block.block.number,
546                    "Stage encountered an execution error: {execution_error}"
547                );
548
549                // We unwind because of an execution error. If the unwind itself
550                // fails, we bail entirely,
551                // otherwise we restart
552                // the execution loop from the beginning.
553                Ok(Some(ControlFlow::Unwind {
554                    target: prev_checkpoint.unwrap_or_default().block_number,
555                    bad_block: block,
556                }))
557            }
558        }
559    } else if let StageError::MissingStaticFileData { block, segment } = err {
560        error!(
561            target: "sync::pipeline",
562            stage = %stage_id,
563            bad_block = %block.block.number,
564            segment = %segment,
565            "Stage is missing static file data."
566        );
567
568        Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
569    } else if err.is_fatal() {
570        error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
571        Err(err.into())
572    } else {
573        // On other errors we assume they are recoverable if we discard the
574        // transaction and run the stage again.
575        warn!(
576            target: "sync::pipeline",
577            stage = %stage_id,
578            "Stage encountered a non-fatal error: {err}. Retrying..."
579        );
580        Ok(None)
581    }
582}
583
584impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
585    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586        f.debug_struct("Pipeline")
587            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
588            .field("max_block", &self.max_block)
589            .field("event_sender", &self.event_sender)
590            .field("fail_on_unwind", &self.fail_on_unwind)
591            .finish()
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use std::sync::atomic::Ordering;
598
599    use super::*;
600    use crate::{test_utils::TestStage, UnwindOutput};
601    use assert_matches::assert_matches;
602    use reth_consensus::ConsensusError;
603    use reth_errors::ProviderError;
604    use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
605    use reth_prune::PruneModes;
606    use reth_testing_utils::generators::{self, random_block_with_parent};
607    use tokio_stream::StreamExt;
608
609    #[test]
610    fn record_progress_calculates_outliers() {
611        let mut progress = PipelineProgress::default();
612
613        progress.update(10);
614        assert_eq!(progress.minimum_block_number, Some(10));
615        assert_eq!(progress.maximum_block_number, Some(10));
616
617        progress.update(20);
618        assert_eq!(progress.minimum_block_number, Some(10));
619        assert_eq!(progress.maximum_block_number, Some(20));
620
621        progress.update(1);
622        assert_eq!(progress.minimum_block_number, Some(1));
623        assert_eq!(progress.maximum_block_number, Some(20));
624    }
625
626    #[test]
627    fn progress_ctrl_flow() {
628        let mut progress = PipelineProgress::default();
629
630        assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
631
632        progress.update(1);
633        assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
634    }
635
636    /// Runs a simple pipeline.
637    #[tokio::test]
638    async fn run_pipeline() {
639        let provider_factory = create_test_provider_factory();
640
641        let stage_a = TestStage::new(StageId::Other("A"))
642            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
643        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
644        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
645
646        let stage_b = TestStage::new(StageId::Other("B"))
647            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
648        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
649        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
650
651        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
652            .add_stage(stage_a)
653            .add_stage(stage_b)
654            .with_max_block(10)
655            .build(
656                provider_factory.clone(),
657                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
658            );
659        let events = pipeline.events();
660
661        // Run pipeline
662        tokio::spawn(async move {
663            pipeline.run().await.unwrap();
664        });
665
666        // Check that the stages were run in order
667        assert_eq!(
668            events.collect::<Vec<PipelineEvent>>().await,
669            vec![
670                PipelineEvent::Prepare {
671                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
672                    stage_id: StageId::Other("A"),
673                    checkpoint: None,
674                    target: Some(10),
675                },
676                PipelineEvent::Run {
677                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
678                    stage_id: StageId::Other("A"),
679                    checkpoint: None,
680                    target: Some(10),
681                },
682                PipelineEvent::Ran {
683                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
684                    stage_id: StageId::Other("A"),
685                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
686                },
687                PipelineEvent::Prepare {
688                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
689                    stage_id: StageId::Other("B"),
690                    checkpoint: None,
691                    target: Some(10),
692                },
693                PipelineEvent::Run {
694                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
695                    stage_id: StageId::Other("B"),
696                    checkpoint: None,
697                    target: Some(10),
698                },
699                PipelineEvent::Ran {
700                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
701                    stage_id: StageId::Other("B"),
702                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
703                },
704            ]
705        );
706
707        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
708        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
709
710        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
711        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
712    }
713
714    /// Unwinds a simple pipeline.
715    #[tokio::test]
716    async fn unwind_pipeline() {
717        let provider_factory = create_test_provider_factory();
718
719        let stage_a = TestStage::new(StageId::Other("A"))
720            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
721            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
722        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
723        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
724
725        let stage_b = TestStage::new(StageId::Other("B"))
726            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
727            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
728        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
729        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
730
731        let stage_c = TestStage::new(StageId::Other("C"))
732            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
733            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
734        let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
735        let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
736
737        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
738            .add_stage(stage_a)
739            .add_stage(stage_b)
740            .add_stage(stage_c)
741            .with_max_block(10)
742            .build(
743                provider_factory.clone(),
744                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
745            );
746        let events = pipeline.events();
747
748        // Run pipeline
749        tokio::spawn(async move {
750            // Sync first
751            pipeline.run().await.expect("Could not run pipeline");
752
753            // Unwind
754            pipeline.unwind(1, None).expect("Could not unwind pipeline");
755        });
756
757        // Check that the stages were unwound in reverse order
758        assert_eq!(
759            events.collect::<Vec<PipelineEvent>>().await,
760            vec![
761                // Executing
762                PipelineEvent::Prepare {
763                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
764                    stage_id: StageId::Other("A"),
765                    checkpoint: None,
766                    target: Some(10),
767                },
768                PipelineEvent::Run {
769                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
770                    stage_id: StageId::Other("A"),
771                    checkpoint: None,
772                    target: Some(10),
773                },
774                PipelineEvent::Ran {
775                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
776                    stage_id: StageId::Other("A"),
777                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
778                },
779                PipelineEvent::Prepare {
780                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
781                    stage_id: StageId::Other("B"),
782                    checkpoint: None,
783                    target: Some(10),
784                },
785                PipelineEvent::Run {
786                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
787                    stage_id: StageId::Other("B"),
788                    checkpoint: None,
789                    target: Some(10),
790                },
791                PipelineEvent::Ran {
792                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
793                    stage_id: StageId::Other("B"),
794                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
795                },
796                PipelineEvent::Prepare {
797                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
798                    stage_id: StageId::Other("C"),
799                    checkpoint: None,
800                    target: Some(10),
801                },
802                PipelineEvent::Run {
803                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
804                    stage_id: StageId::Other("C"),
805                    checkpoint: None,
806                    target: Some(10),
807                },
808                PipelineEvent::Ran {
809                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
810                    stage_id: StageId::Other("C"),
811                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
812                },
813                // Unwinding
814                PipelineEvent::Unwind {
815                    stage_id: StageId::Other("C"),
816                    input: UnwindInput {
817                        checkpoint: StageCheckpoint::new(20),
818                        unwind_to: 1,
819                        bad_block: None
820                    }
821                },
822                PipelineEvent::Unwound {
823                    stage_id: StageId::Other("C"),
824                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
825                },
826                PipelineEvent::Unwind {
827                    stage_id: StageId::Other("B"),
828                    input: UnwindInput {
829                        checkpoint: StageCheckpoint::new(10),
830                        unwind_to: 1,
831                        bad_block: None
832                    }
833                },
834                PipelineEvent::Unwound {
835                    stage_id: StageId::Other("B"),
836                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
837                },
838                PipelineEvent::Unwind {
839                    stage_id: StageId::Other("A"),
840                    input: UnwindInput {
841                        checkpoint: StageCheckpoint::new(100),
842                        unwind_to: 1,
843                        bad_block: None
844                    }
845                },
846                PipelineEvent::Unwound {
847                    stage_id: StageId::Other("A"),
848                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
849                },
850            ]
851        );
852
853        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
854        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
855
856        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
857        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
858
859        assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
860        assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
861    }
862
863    /// Unwinds a pipeline with intermediate progress.
864    #[tokio::test]
865    async fn unwind_pipeline_with_intermediate_progress() {
866        let provider_factory = create_test_provider_factory();
867
868        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
869            .add_stage(
870                TestStage::new(StageId::Other("A"))
871                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
872                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
873            )
874            .add_stage(
875                TestStage::new(StageId::Other("B"))
876                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
877            )
878            .with_max_block(10)
879            .build(
880                provider_factory.clone(),
881                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
882            );
883        let events = pipeline.events();
884
885        // Run pipeline
886        tokio::spawn(async move {
887            // Sync first
888            pipeline.run().await.expect("Could not run pipeline");
889
890            // Unwind
891            pipeline.unwind(50, None).expect("Could not unwind pipeline");
892        });
893
894        // Check that the stages were unwound in reverse order
895        assert_eq!(
896            events.collect::<Vec<PipelineEvent>>().await,
897            vec![
898                // Executing
899                PipelineEvent::Prepare {
900                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
901                    stage_id: StageId::Other("A"),
902                    checkpoint: None,
903                    target: Some(10),
904                },
905                PipelineEvent::Run {
906                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
907                    stage_id: StageId::Other("A"),
908                    checkpoint: None,
909                    target: Some(10),
910                },
911                PipelineEvent::Ran {
912                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
913                    stage_id: StageId::Other("A"),
914                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
915                },
916                PipelineEvent::Prepare {
917                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
918                    stage_id: StageId::Other("B"),
919                    checkpoint: None,
920                    target: Some(10),
921                },
922                PipelineEvent::Run {
923                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
924                    stage_id: StageId::Other("B"),
925                    checkpoint: None,
926                    target: Some(10),
927                },
928                PipelineEvent::Ran {
929                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
930                    stage_id: StageId::Other("B"),
931                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
932                },
933                // Unwinding
934                // Nothing to unwind in stage "B"
935                PipelineEvent::Skipped { stage_id: StageId::Other("B") },
936                PipelineEvent::Unwind {
937                    stage_id: StageId::Other("A"),
938                    input: UnwindInput {
939                        checkpoint: StageCheckpoint::new(100),
940                        unwind_to: 50,
941                        bad_block: None
942                    }
943                },
944                PipelineEvent::Unwound {
945                    stage_id: StageId::Other("A"),
946                    result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
947                },
948            ]
949        );
950    }
951
952    /// Runs a pipeline that unwinds during sync.
953    ///
954    /// The flow is:
955    ///
956    /// - Stage A syncs to block 10
957    /// - Stage B triggers an unwind, marking block 5 as bad
958    /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is
959    ///   skipped entirely (there is nothing to unwind)
960    /// - Stage A unwinds to its previous progress, block 0
961    /// - Stage A syncs back up to block 10
962    /// - Stage B syncs to block 10
963    /// - The pipeline finishes
964    #[tokio::test]
965    async fn run_pipeline_with_unwind() {
966        let provider_factory = create_test_provider_factory();
967
968        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
969            .add_stage(
970                TestStage::new(StageId::Other("A"))
971                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
972                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
973                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
974            )
975            .add_stage(
976                TestStage::new(StageId::Other("B"))
977                    .add_exec(Err(StageError::Block {
978                        block: Box::new(random_block_with_parent(
979                            &mut generators::rng(),
980                            5,
981                            Default::default(),
982                        )),
983                        error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
984                    }))
985                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
986                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
987            )
988            .with_max_block(10)
989            .build(
990                provider_factory.clone(),
991                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
992            );
993        let events = pipeline.events();
994
995        // Run pipeline
996        tokio::spawn(async move {
997            pipeline.run().await.expect("Could not run pipeline");
998        });
999
1000        // Check that the stages were unwound in reverse order
1001        assert_eq!(
1002            events.collect::<Vec<PipelineEvent>>().await,
1003            vec![
1004                PipelineEvent::Prepare {
1005                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1006                    stage_id: StageId::Other("A"),
1007                    checkpoint: None,
1008                    target: Some(10),
1009                },
1010                PipelineEvent::Run {
1011                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1012                    stage_id: StageId::Other("A"),
1013                    checkpoint: None,
1014                    target: Some(10),
1015                },
1016                PipelineEvent::Ran {
1017                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1018                    stage_id: StageId::Other("A"),
1019                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1020                },
1021                PipelineEvent::Prepare {
1022                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1023                    stage_id: StageId::Other("B"),
1024                    checkpoint: None,
1025                    target: Some(10),
1026                },
1027                PipelineEvent::Run {
1028                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1029                    stage_id: StageId::Other("B"),
1030                    checkpoint: None,
1031                    target: Some(10),
1032                },
1033                PipelineEvent::Error { stage_id: StageId::Other("B") },
1034                PipelineEvent::Unwind {
1035                    stage_id: StageId::Other("A"),
1036                    input: UnwindInput {
1037                        checkpoint: StageCheckpoint::new(10),
1038                        unwind_to: 0,
1039                        bad_block: Some(5)
1040                    }
1041                },
1042                PipelineEvent::Unwound {
1043                    stage_id: StageId::Other("A"),
1044                    result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1045                },
1046                PipelineEvent::Prepare {
1047                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1048                    stage_id: StageId::Other("A"),
1049                    checkpoint: Some(StageCheckpoint::new(0)),
1050                    target: Some(10),
1051                },
1052                PipelineEvent::Run {
1053                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1054                    stage_id: StageId::Other("A"),
1055                    checkpoint: Some(StageCheckpoint::new(0)),
1056                    target: Some(10),
1057                },
1058                PipelineEvent::Ran {
1059                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1060                    stage_id: StageId::Other("A"),
1061                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1062                },
1063                PipelineEvent::Prepare {
1064                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1065                    stage_id: StageId::Other("B"),
1066                    checkpoint: None,
1067                    target: Some(10),
1068                },
1069                PipelineEvent::Run {
1070                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1071                    stage_id: StageId::Other("B"),
1072                    checkpoint: None,
1073                    target: Some(10),
1074                },
1075                PipelineEvent::Ran {
1076                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1077                    stage_id: StageId::Other("B"),
1078                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1079                },
1080            ]
1081        );
1082    }
1083
1084    /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
1085    #[tokio::test]
1086    async fn pipeline_error_handling() {
1087        // Non-fatal
1088        let provider_factory = create_test_provider_factory();
1089        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1090            .add_stage(
1091                TestStage::new(StageId::Other("NonFatal"))
1092                    .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1093                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1094            )
1095            .with_max_block(10)
1096            .build(
1097                provider_factory.clone(),
1098                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1099            );
1100        let result = pipeline.run().await;
1101        assert_matches!(result, Ok(()));
1102
1103        // Fatal
1104        let provider_factory = create_test_provider_factory();
1105        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1106            .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1107                StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1108            )))
1109            .build(
1110                provider_factory.clone(),
1111                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1112            );
1113        let result = pipeline.run().await;
1114        assert_matches!(
1115            result,
1116            Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1117                ProviderError::BlockBodyIndicesNotFound(5)
1118            )))
1119        );
1120    }
1121}