reth_stages_api/pipeline/
mod.rs

1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
11    ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
12    StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::pin::Pin;
18use tokio::sync::watch;
19use tracing::*;
20
21mod builder;
22mod progress;
23mod set;
24
25use crate::{
26    BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
27    StageError, StageExt, UnwindInput,
28};
29pub use builder::*;
30use progress::*;
31use reth_errors::RethResult;
32pub use set::*;
33
34/// A container for a queued stage.
35pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
36
37/// The future that returns the owned pipeline and the result of the pipeline run. See
38/// [`Pipeline::run_as_fut`].
39pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
40
41/// The pipeline type itself with the result of [`Pipeline::run_as_fut`]
42pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
43
44#[cfg_attr(doc, aquamarine::aquamarine)]
45/// A staged sync pipeline.
46///
47/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
48/// of the chain and the pipeline then executes each stage in order from the current local chain tip
49/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
50/// tip.
51///
52/// After the entire pipeline has been run, it will run again unless asked to stop (see
53/// [`Pipeline::set_max_block`]).
54///
55/// `include_mmd!("docs/mermaid/pipeline.mmd`")
56///
57/// # Unwinding
58///
59/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
60/// pipeline will unwind the stages in reverse order of execution. It is also possible to
61/// request an unwind manually (see [`Pipeline::unwind`]).
62///
63/// # Defaults
64///
65/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
66pub struct Pipeline<N: ProviderNodeTypes> {
67    /// Provider factory.
68    provider_factory: ProviderFactory<N>,
69    /// All configured stages in the order they will be executed.
70    stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
71    /// The maximum block number to sync to.
72    max_block: Option<BlockNumber>,
73    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
74    /// Sender for events the pipeline emits.
75    event_sender: EventSender<PipelineEvent>,
76    /// Keeps track of the progress of the pipeline.
77    progress: PipelineProgress,
78    /// A receiver for the current chain tip to sync to.
79    tip_tx: Option<watch::Sender<B256>>,
80    metrics_tx: Option<MetricEventsSender>,
81    /// Whether an unwind should fail the syncing process. Should only be set when downloading
82    /// blocks from trusted sources and expecting them to be valid.
83    fail_on_unwind: bool,
84}
85
86impl<N: ProviderNodeTypes> Pipeline<N> {
87    /// Construct a pipeline using a [`PipelineBuilder`].
88    pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
89    {
90        PipelineBuilder::default()
91    }
92
93    /// Return the minimum block number achieved by
94    /// any stage during the execution of the pipeline.
95    pub const fn minimum_block_number(&self) -> Option<u64> {
96        self.progress.minimum_block_number
97    }
98
99    /// Set tip for reverse sync.
100    #[track_caller]
101    pub fn set_tip(&self, tip: B256) {
102        let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
103            warn!(target: "sync::pipeline", "Chain tip channel closed");
104        });
105    }
106
107    /// Listen for events on the pipeline.
108    pub fn events(&self) -> EventStream<PipelineEvent> {
109        self.event_sender.new_listener()
110    }
111}
112
113impl<N: ProviderNodeTypes> Pipeline<N> {
114    /// Registers progress metrics for each registered stage
115    pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
116        let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
117        let provider = self.provider_factory.provider()?;
118
119        for stage in &self.stages {
120            let stage_id = stage.id();
121            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
122                stage_id,
123                checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
124                max_block_number: None,
125            });
126        }
127        Ok(())
128    }
129
130    /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
131    /// pipeline and its result as a future.
132    #[track_caller]
133    pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
134        let _ = self.register_metrics();
135        Box::pin(async move {
136            // NOTE: the tip should only be None if we are in continuous sync mode.
137            if let Some(target) = target {
138                match target {
139                    PipelineTarget::Sync(tip) => self.set_tip(tip),
140                    PipelineTarget::Unwind(target) => {
141                        if let Err(err) = self.move_to_static_files() {
142                            return (self, Err(err.into()))
143                        }
144                        if let Err(err) = self.unwind(target, None) {
145                            return (self, Err(err))
146                        }
147                        self.progress.update(target);
148
149                        return (self, Ok(ControlFlow::Continue { block_number: target }))
150                    }
151                }
152            }
153
154            let result = self.run_loop().await;
155            trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
156            (self, result)
157        })
158    }
159
160    /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
161    /// a `max_block` in the pipeline.
162    pub async fn run(&mut self) -> Result<(), PipelineError> {
163        let _ = self.register_metrics(); // ignore error
164
165        loop {
166            let next_action = self.run_loop().await?;
167
168            if next_action.is_unwind() && self.fail_on_unwind {
169                return Err(PipelineError::UnexpectedUnwind)
170            }
171
172            // Terminate the loop early if it's reached the maximum user
173            // configured block.
174            if next_action.should_continue() &&
175                self.progress
176                    .minimum_block_number
177                    .zip(self.max_block)
178                    .is_some_and(|(progress, target)| progress >= target)
179            {
180                trace!(
181                    target: "sync::pipeline",
182                    ?next_action,
183                    minimum_block_number = ?self.progress.minimum_block_number,
184                    max_block = ?self.max_block,
185                    "Terminating pipeline."
186                );
187                return Ok(())
188            }
189        }
190    }
191
192    /// Performs one pass of the pipeline across all stages. After successful
193    /// execution of each stage, it proceeds to commit it to the database.
194    ///
195    /// If any stage is unsuccessful at execution, we proceed to
196    /// unwind. This will undo the progress across the entire pipeline
197    /// up to the block that caused the error.
198    ///
199    /// Returns the control flow after it ran the pipeline.
200    /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in
201    /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage
202    /// that caused the unwind.
203    pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
204        self.move_to_static_files()?;
205
206        let mut previous_stage = None;
207        for stage_index in 0..self.stages.len() {
208            let stage = &self.stages[stage_index];
209            let stage_id = stage.id();
210
211            trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
212            let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
213
214            trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
215
216            match next {
217                ControlFlow::NoProgress { block_number } => {
218                    if let Some(block_number) = block_number {
219                        self.progress.update(block_number);
220                    }
221                }
222                ControlFlow::Continue { block_number } => self.progress.update(block_number),
223                ControlFlow::Unwind { target, bad_block } => {
224                    self.unwind(target, Some(bad_block.block.number))?;
225                    return Ok(ControlFlow::Unwind { target, bad_block })
226                }
227            }
228
229            previous_stage = Some(
230                self.provider_factory
231                    .provider()?
232                    .get_stage_checkpoint(stage_id)?
233                    .unwrap_or_default()
234                    .block_number,
235            );
236        }
237
238        Ok(self.progress.next_ctrl())
239    }
240
241    /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
242    /// all data from the database to static files for corresponding
243    /// [segments](reth_static_file_types::StaticFileSegment), according to their [stage
244    /// checkpoints](StageCheckpoint):
245    /// - [`StaticFileSegment::Headers`](reth_static_file_types::StaticFileSegment::Headers) ->
246    ///   [`StageId::Headers`]
247    /// - [`StaticFileSegment::Receipts`](reth_static_file_types::StaticFileSegment::Receipts) ->
248    ///   [`StageId::Execution`]
249    /// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
250    ///   -> [`StageId::Bodies`]
251    ///
252    /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
253    /// lock is occupied.
254    pub fn move_to_static_files(&self) -> RethResult<()> {
255        // Copies data from database to static files
256        let lowest_static_file_height =
257            self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
258
259        // Deletes data which has been copied to static files.
260        if let Some(prune_tip) = lowest_static_file_height {
261            // Run the pruner so we don't potentially end up with higher height in the database vs
262            // static files during a pipeline unwind
263            let mut pruner = PrunerBuilder::new(Default::default())
264                .delete_limit(usize::MAX)
265                .build_with_provider_factory(self.provider_factory.clone());
266
267            pruner.run(prune_tip)?;
268        }
269
270        Ok(())
271    }
272
273    /// Unwind the stages to the target block (exclusive).
274    ///
275    /// If the unwind is due to a bad block the number of that block should be specified.
276    pub fn unwind(
277        &mut self,
278        to: BlockNumber,
279        bad_block: Option<BlockNumber>,
280    ) -> Result<(), PipelineError> {
281        // Unwind stages in reverse order of execution
282        let unwind_pipeline = self.stages.iter_mut().rev();
283
284        // Legacy Engine: This prevents a race condition in which the `StaticFileProducer` could
285        // attempt to proceed with a finalized block which has been unwinded
286        let _locked_sf_producer = self.static_file_producer.lock();
287
288        let mut provider_rw = self.provider_factory.database_provider_rw()?;
289
290        for stage in unwind_pipeline {
291            let stage_id = stage.id();
292            let span = info_span!("Unwinding", stage = %stage_id);
293            let _enter = span.enter();
294
295            let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
296            if checkpoint.block_number < to {
297                debug!(
298                    target: "sync::pipeline",
299                    from = %checkpoint.block_number,
300                    %to,
301                    "Unwind point too far for stage"
302                );
303                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
304
305                continue
306            }
307
308            info!(
309                target: "sync::pipeline",
310                from = %checkpoint.block_number,
311                %to,
312                ?bad_block,
313                "Starting unwind"
314            );
315            while checkpoint.block_number > to {
316                let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
317                self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
318
319                let output = stage.unwind(&provider_rw, input);
320                match output {
321                    Ok(unwind_output) => {
322                        checkpoint = unwind_output.checkpoint;
323                        info!(
324                            target: "sync::pipeline",
325                            stage = %stage_id,
326                            unwind_to = to,
327                            progress = checkpoint.block_number,
328                            done = checkpoint.block_number == to,
329                            "Stage unwound"
330                        );
331                        if let Some(metrics_tx) = &mut self.metrics_tx {
332                            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
333                                stage_id,
334                                checkpoint,
335                                // We assume it was set in the previous execute iteration, so it
336                                // doesn't change when we unwind.
337                                max_block_number: None,
338                            });
339                        }
340                        provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
341
342                        self.event_sender
343                            .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
344
345                        // update finalized block if needed
346                        let last_saved_finalized_block_number =
347                            provider_rw.last_finalized_block_number()?;
348
349                        // If None, that means the finalized block is not written so we should
350                        // always save in that case
351                        if last_saved_finalized_block_number.is_none() ||
352                            Some(checkpoint.block_number) < last_saved_finalized_block_number
353                        {
354                            provider_rw.save_finalized_block_number(BlockNumber::from(
355                                checkpoint.block_number,
356                            ))?;
357                        }
358
359                        UnifiedStorageWriter::commit_unwind(provider_rw)?;
360
361                        stage.post_unwind_commit()?;
362
363                        provider_rw = self.provider_factory.database_provider_rw()?;
364                    }
365                    Err(err) => {
366                        self.event_sender.notify(PipelineEvent::Error { stage_id });
367
368                        return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
369                    }
370                }
371            }
372        }
373
374        Ok(())
375    }
376
377    async fn execute_stage_to_completion(
378        &mut self,
379        previous_stage: Option<BlockNumber>,
380        stage_index: usize,
381    ) -> Result<ControlFlow, PipelineError> {
382        let total_stages = self.stages.len();
383
384        let stage = &mut self.stages[stage_index];
385        let stage_id = stage.id();
386        let mut made_progress = false;
387        let target = self.max_block.or(previous_stage);
388
389        loop {
390            let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
391
392            let stage_reached_max_block = prev_checkpoint
393                .zip(self.max_block)
394                .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
395            if stage_reached_max_block {
396                warn!(
397                    target: "sync::pipeline",
398                    stage = %stage_id,
399                    max_block = self.max_block,
400                    prev_block = prev_checkpoint.map(|progress| progress.block_number),
401                    "Stage reached target block, skipping."
402                );
403                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
404
405                // We reached the maximum block, so we skip the stage
406                return Ok(ControlFlow::NoProgress {
407                    block_number: prev_checkpoint.map(|progress| progress.block_number),
408                })
409            }
410
411            let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
412
413            self.event_sender.notify(PipelineEvent::Prepare {
414                pipeline_stages_progress: PipelineStagesProgress {
415                    current: stage_index + 1,
416                    total: total_stages,
417                },
418                stage_id,
419                checkpoint: prev_checkpoint,
420                target,
421            });
422
423            if let Err(err) = stage.execute_ready(exec_input).await {
424                self.event_sender.notify(PipelineEvent::Error { stage_id });
425
426                match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
427                    Some(ctrl) => return Ok(ctrl),
428                    None => continue,
429                };
430            }
431
432            let provider_rw = self.provider_factory.database_provider_rw()?;
433
434            self.event_sender.notify(PipelineEvent::Run {
435                pipeline_stages_progress: PipelineStagesProgress {
436                    current: stage_index + 1,
437                    total: total_stages,
438                },
439                stage_id,
440                checkpoint: prev_checkpoint,
441                target,
442            });
443
444            match stage.execute(&provider_rw, exec_input) {
445                Ok(out @ ExecOutput { checkpoint, done }) => {
446                    made_progress |=
447                        checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
448
449                    if let Some(metrics_tx) = &mut self.metrics_tx {
450                        let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
451                            stage_id,
452                            checkpoint,
453                            max_block_number: target,
454                        });
455                    }
456                    provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
457
458                    self.event_sender.notify(PipelineEvent::Ran {
459                        pipeline_stages_progress: PipelineStagesProgress {
460                            current: stage_index + 1,
461                            total: total_stages,
462                        },
463                        stage_id,
464                        result: out.clone(),
465                    });
466
467                    UnifiedStorageWriter::commit(provider_rw)?;
468
469                    stage.post_execute_commit()?;
470
471                    if done {
472                        let block_number = checkpoint.block_number;
473                        return Ok(if made_progress {
474                            ControlFlow::Continue { block_number }
475                        } else {
476                            ControlFlow::NoProgress { block_number: Some(block_number) }
477                        })
478                    }
479                }
480                Err(err) => {
481                    drop(provider_rw);
482                    self.event_sender.notify(PipelineEvent::Error { stage_id });
483
484                    if let Some(ctrl) =
485                        on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
486                    {
487                        return Ok(ctrl)
488                    }
489                }
490            }
491        }
492    }
493}
494
495fn on_stage_error<N: ProviderNodeTypes>(
496    factory: &ProviderFactory<N>,
497    stage_id: StageId,
498    prev_checkpoint: Option<StageCheckpoint>,
499    err: StageError,
500) -> Result<Option<ControlFlow>, PipelineError> {
501    if let StageError::DetachedHead { local_head, header, error } = err {
502        warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
503
504        // We unwind because of a detached head.
505        let unwind_to =
506            local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
507        Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
508    } else if let StageError::Block { block, error } = err {
509        match error {
510            BlockErrorKind::Validation(validation_error) => {
511                error!(
512                    target: "sync::pipeline",
513                    stage = %stage_id,
514                    bad_block = %block.block.number,
515                    "Stage encountered a validation error: {validation_error}"
516                );
517
518                // FIXME: When handling errors, we do not commit the database transaction. This
519                // leads to the Merkle stage not clearing its checkpoint, and restarting from an
520                // invalid place.
521                let provider_rw = factory.database_provider_rw()?;
522                provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
523                provider_rw.save_stage_checkpoint(
524                    StageId::MerkleExecute,
525                    prev_checkpoint.unwrap_or_default(),
526                )?;
527
528                UnifiedStorageWriter::commit(provider_rw)?;
529
530                // We unwind because of a validation error. If the unwind itself
531                // fails, we bail entirely,
532                // otherwise we restart the execution loop from the
533                // beginning.
534                Ok(Some(ControlFlow::Unwind {
535                    target: prev_checkpoint.unwrap_or_default().block_number,
536                    bad_block: block,
537                }))
538            }
539            BlockErrorKind::Execution(execution_error) => {
540                error!(
541                    target: "sync::pipeline",
542                    stage = %stage_id,
543                    bad_block = %block.block.number,
544                    "Stage encountered an execution error: {execution_error}"
545                );
546
547                // We unwind because of an execution error. If the unwind itself
548                // fails, we bail entirely,
549                // otherwise we restart
550                // the execution loop from the beginning.
551                Ok(Some(ControlFlow::Unwind {
552                    target: prev_checkpoint.unwrap_or_default().block_number,
553                    bad_block: block,
554                }))
555            }
556        }
557    } else if let StageError::MissingStaticFileData { block, segment } = err {
558        error!(
559            target: "sync::pipeline",
560            stage = %stage_id,
561            bad_block = %block.block.number,
562            segment = %segment,
563            "Stage is missing static file data."
564        );
565
566        Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
567    } else if err.is_fatal() {
568        error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
569        Err(err.into())
570    } else {
571        // On other errors we assume they are recoverable if we discard the
572        // transaction and run the stage again.
573        warn!(
574            target: "sync::pipeline",
575            stage = %stage_id,
576            "Stage encountered a non-fatal error: {err}. Retrying..."
577        );
578        Ok(None)
579    }
580}
581
582impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
583    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
584        f.debug_struct("Pipeline")
585            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
586            .field("max_block", &self.max_block)
587            .field("event_sender", &self.event_sender)
588            .field("fail_on_unwind", &self.fail_on_unwind)
589            .finish()
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use std::sync::atomic::Ordering;
596
597    use super::*;
598    use crate::{test_utils::TestStage, UnwindOutput};
599    use assert_matches::assert_matches;
600    use reth_consensus::ConsensusError;
601    use reth_errors::ProviderError;
602    use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
603    use reth_prune::PruneModes;
604    use reth_testing_utils::generators::{self, random_block_with_parent};
605    use tokio_stream::StreamExt;
606
607    #[test]
608    fn record_progress_calculates_outliers() {
609        let mut progress = PipelineProgress::default();
610
611        progress.update(10);
612        assert_eq!(progress.minimum_block_number, Some(10));
613        assert_eq!(progress.maximum_block_number, Some(10));
614
615        progress.update(20);
616        assert_eq!(progress.minimum_block_number, Some(10));
617        assert_eq!(progress.maximum_block_number, Some(20));
618
619        progress.update(1);
620        assert_eq!(progress.minimum_block_number, Some(1));
621        assert_eq!(progress.maximum_block_number, Some(20));
622    }
623
624    #[test]
625    fn progress_ctrl_flow() {
626        let mut progress = PipelineProgress::default();
627
628        assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
629
630        progress.update(1);
631        assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
632    }
633
634    /// Runs a simple pipeline.
635    #[tokio::test]
636    async fn run_pipeline() {
637        let provider_factory = create_test_provider_factory();
638
639        let stage_a = TestStage::new(StageId::Other("A"))
640            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
641        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
642        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
643
644        let stage_b = TestStage::new(StageId::Other("B"))
645            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
646        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
647        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
648
649        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
650            .add_stage(stage_a)
651            .add_stage(stage_b)
652            .with_max_block(10)
653            .build(
654                provider_factory.clone(),
655                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
656            );
657        let events = pipeline.events();
658
659        // Run pipeline
660        tokio::spawn(async move {
661            pipeline.run().await.unwrap();
662        });
663
664        // Check that the stages were run in order
665        assert_eq!(
666            events.collect::<Vec<PipelineEvent>>().await,
667            vec![
668                PipelineEvent::Prepare {
669                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
670                    stage_id: StageId::Other("A"),
671                    checkpoint: None,
672                    target: Some(10),
673                },
674                PipelineEvent::Run {
675                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
676                    stage_id: StageId::Other("A"),
677                    checkpoint: None,
678                    target: Some(10),
679                },
680                PipelineEvent::Ran {
681                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
682                    stage_id: StageId::Other("A"),
683                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
684                },
685                PipelineEvent::Prepare {
686                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
687                    stage_id: StageId::Other("B"),
688                    checkpoint: None,
689                    target: Some(10),
690                },
691                PipelineEvent::Run {
692                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
693                    stage_id: StageId::Other("B"),
694                    checkpoint: None,
695                    target: Some(10),
696                },
697                PipelineEvent::Ran {
698                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
699                    stage_id: StageId::Other("B"),
700                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
701                },
702            ]
703        );
704
705        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
706        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
707
708        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
709        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
710    }
711
712    /// Unwinds a simple pipeline.
713    #[tokio::test]
714    async fn unwind_pipeline() {
715        let provider_factory = create_test_provider_factory();
716
717        let stage_a = TestStage::new(StageId::Other("A"))
718            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
719            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
720        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
721        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
722
723        let stage_b = TestStage::new(StageId::Other("B"))
724            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
725            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
726        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
727        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
728
729        let stage_c = TestStage::new(StageId::Other("C"))
730            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
731            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
732        let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
733        let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
734
735        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
736            .add_stage(stage_a)
737            .add_stage(stage_b)
738            .add_stage(stage_c)
739            .with_max_block(10)
740            .build(
741                provider_factory.clone(),
742                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
743            );
744        let events = pipeline.events();
745
746        // Run pipeline
747        tokio::spawn(async move {
748            // Sync first
749            pipeline.run().await.expect("Could not run pipeline");
750
751            // Unwind
752            pipeline.unwind(1, None).expect("Could not unwind pipeline");
753        });
754
755        // Check that the stages were unwound in reverse order
756        assert_eq!(
757            events.collect::<Vec<PipelineEvent>>().await,
758            vec![
759                // Executing
760                PipelineEvent::Prepare {
761                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
762                    stage_id: StageId::Other("A"),
763                    checkpoint: None,
764                    target: Some(10),
765                },
766                PipelineEvent::Run {
767                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
768                    stage_id: StageId::Other("A"),
769                    checkpoint: None,
770                    target: Some(10),
771                },
772                PipelineEvent::Ran {
773                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
774                    stage_id: StageId::Other("A"),
775                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
776                },
777                PipelineEvent::Prepare {
778                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
779                    stage_id: StageId::Other("B"),
780                    checkpoint: None,
781                    target: Some(10),
782                },
783                PipelineEvent::Run {
784                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
785                    stage_id: StageId::Other("B"),
786                    checkpoint: None,
787                    target: Some(10),
788                },
789                PipelineEvent::Ran {
790                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
791                    stage_id: StageId::Other("B"),
792                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
793                },
794                PipelineEvent::Prepare {
795                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
796                    stage_id: StageId::Other("C"),
797                    checkpoint: None,
798                    target: Some(10),
799                },
800                PipelineEvent::Run {
801                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
802                    stage_id: StageId::Other("C"),
803                    checkpoint: None,
804                    target: Some(10),
805                },
806                PipelineEvent::Ran {
807                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
808                    stage_id: StageId::Other("C"),
809                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
810                },
811                // Unwinding
812                PipelineEvent::Unwind {
813                    stage_id: StageId::Other("C"),
814                    input: UnwindInput {
815                        checkpoint: StageCheckpoint::new(20),
816                        unwind_to: 1,
817                        bad_block: None
818                    }
819                },
820                PipelineEvent::Unwound {
821                    stage_id: StageId::Other("C"),
822                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
823                },
824                PipelineEvent::Unwind {
825                    stage_id: StageId::Other("B"),
826                    input: UnwindInput {
827                        checkpoint: StageCheckpoint::new(10),
828                        unwind_to: 1,
829                        bad_block: None
830                    }
831                },
832                PipelineEvent::Unwound {
833                    stage_id: StageId::Other("B"),
834                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
835                },
836                PipelineEvent::Unwind {
837                    stage_id: StageId::Other("A"),
838                    input: UnwindInput {
839                        checkpoint: StageCheckpoint::new(100),
840                        unwind_to: 1,
841                        bad_block: None
842                    }
843                },
844                PipelineEvent::Unwound {
845                    stage_id: StageId::Other("A"),
846                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
847                },
848            ]
849        );
850
851        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
852        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
853
854        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
855        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
856
857        assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
858        assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
859    }
860
861    /// Unwinds a pipeline with intermediate progress.
862    #[tokio::test]
863    async fn unwind_pipeline_with_intermediate_progress() {
864        let provider_factory = create_test_provider_factory();
865
866        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
867            .add_stage(
868                TestStage::new(StageId::Other("A"))
869                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
870                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
871            )
872            .add_stage(
873                TestStage::new(StageId::Other("B"))
874                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
875            )
876            .with_max_block(10)
877            .build(
878                provider_factory.clone(),
879                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
880            );
881        let events = pipeline.events();
882
883        // Run pipeline
884        tokio::spawn(async move {
885            // Sync first
886            pipeline.run().await.expect("Could not run pipeline");
887
888            // Unwind
889            pipeline.unwind(50, None).expect("Could not unwind pipeline");
890        });
891
892        // Check that the stages were unwound in reverse order
893        assert_eq!(
894            events.collect::<Vec<PipelineEvent>>().await,
895            vec![
896                // Executing
897                PipelineEvent::Prepare {
898                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
899                    stage_id: StageId::Other("A"),
900                    checkpoint: None,
901                    target: Some(10),
902                },
903                PipelineEvent::Run {
904                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
905                    stage_id: StageId::Other("A"),
906                    checkpoint: None,
907                    target: Some(10),
908                },
909                PipelineEvent::Ran {
910                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
911                    stage_id: StageId::Other("A"),
912                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
913                },
914                PipelineEvent::Prepare {
915                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
916                    stage_id: StageId::Other("B"),
917                    checkpoint: None,
918                    target: Some(10),
919                },
920                PipelineEvent::Run {
921                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
922                    stage_id: StageId::Other("B"),
923                    checkpoint: None,
924                    target: Some(10),
925                },
926                PipelineEvent::Ran {
927                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
928                    stage_id: StageId::Other("B"),
929                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
930                },
931                // Unwinding
932                // Nothing to unwind in stage "B"
933                PipelineEvent::Skipped { stage_id: StageId::Other("B") },
934                PipelineEvent::Unwind {
935                    stage_id: StageId::Other("A"),
936                    input: UnwindInput {
937                        checkpoint: StageCheckpoint::new(100),
938                        unwind_to: 50,
939                        bad_block: None
940                    }
941                },
942                PipelineEvent::Unwound {
943                    stage_id: StageId::Other("A"),
944                    result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
945                },
946            ]
947        );
948    }
949
950    /// Runs a pipeline that unwinds during sync.
951    ///
952    /// The flow is:
953    ///
954    /// - Stage A syncs to block 10
955    /// - Stage B triggers an unwind, marking block 5 as bad
956    /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is
957    ///   skipped entirely (there is nothing to unwind)
958    /// - Stage A unwinds to its previous progress, block 0
959    /// - Stage A syncs back up to block 10
960    /// - Stage B syncs to block 10
961    /// - The pipeline finishes
962    #[tokio::test]
963    async fn run_pipeline_with_unwind() {
964        let provider_factory = create_test_provider_factory();
965
966        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
967            .add_stage(
968                TestStage::new(StageId::Other("A"))
969                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
970                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
971                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
972            )
973            .add_stage(
974                TestStage::new(StageId::Other("B"))
975                    .add_exec(Err(StageError::Block {
976                        block: Box::new(random_block_with_parent(
977                            &mut generators::rng(),
978                            5,
979                            Default::default(),
980                        )),
981                        error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
982                    }))
983                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
984                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
985            )
986            .with_max_block(10)
987            .build(
988                provider_factory.clone(),
989                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
990            );
991        let events = pipeline.events();
992
993        // Run pipeline
994        tokio::spawn(async move {
995            pipeline.run().await.expect("Could not run pipeline");
996        });
997
998        // Check that the stages were unwound in reverse order
999        assert_eq!(
1000            events.collect::<Vec<PipelineEvent>>().await,
1001            vec![
1002                PipelineEvent::Prepare {
1003                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1004                    stage_id: StageId::Other("A"),
1005                    checkpoint: None,
1006                    target: Some(10),
1007                },
1008                PipelineEvent::Run {
1009                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1010                    stage_id: StageId::Other("A"),
1011                    checkpoint: None,
1012                    target: Some(10),
1013                },
1014                PipelineEvent::Ran {
1015                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1016                    stage_id: StageId::Other("A"),
1017                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1018                },
1019                PipelineEvent::Prepare {
1020                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1021                    stage_id: StageId::Other("B"),
1022                    checkpoint: None,
1023                    target: Some(10),
1024                },
1025                PipelineEvent::Run {
1026                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1027                    stage_id: StageId::Other("B"),
1028                    checkpoint: None,
1029                    target: Some(10),
1030                },
1031                PipelineEvent::Error { stage_id: StageId::Other("B") },
1032                PipelineEvent::Unwind {
1033                    stage_id: StageId::Other("A"),
1034                    input: UnwindInput {
1035                        checkpoint: StageCheckpoint::new(10),
1036                        unwind_to: 0,
1037                        bad_block: Some(5)
1038                    }
1039                },
1040                PipelineEvent::Unwound {
1041                    stage_id: StageId::Other("A"),
1042                    result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1043                },
1044                PipelineEvent::Prepare {
1045                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1046                    stage_id: StageId::Other("A"),
1047                    checkpoint: Some(StageCheckpoint::new(0)),
1048                    target: Some(10),
1049                },
1050                PipelineEvent::Run {
1051                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1052                    stage_id: StageId::Other("A"),
1053                    checkpoint: Some(StageCheckpoint::new(0)),
1054                    target: Some(10),
1055                },
1056                PipelineEvent::Ran {
1057                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1058                    stage_id: StageId::Other("A"),
1059                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1060                },
1061                PipelineEvent::Prepare {
1062                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1063                    stage_id: StageId::Other("B"),
1064                    checkpoint: None,
1065                    target: Some(10),
1066                },
1067                PipelineEvent::Run {
1068                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1069                    stage_id: StageId::Other("B"),
1070                    checkpoint: None,
1071                    target: Some(10),
1072                },
1073                PipelineEvent::Ran {
1074                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1075                    stage_id: StageId::Other("B"),
1076                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1077                },
1078            ]
1079        );
1080    }
1081
1082    /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
1083    #[tokio::test]
1084    async fn pipeline_error_handling() {
1085        // Non-fatal
1086        let provider_factory = create_test_provider_factory();
1087        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1088            .add_stage(
1089                TestStage::new(StageId::Other("NonFatal"))
1090                    .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1091                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1092            )
1093            .with_max_block(10)
1094            .build(
1095                provider_factory.clone(),
1096                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1097            );
1098        let result = pipeline.run().await;
1099        assert_matches!(result, Ok(()));
1100
1101        // Fatal
1102        let provider_factory = create_test_provider_factory();
1103        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1104            .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1105                StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1106            )))
1107            .build(
1108                provider_factory.clone(),
1109                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1110            );
1111        let result = pipeline.run().await;
1112        assert_matches!(
1113            result,
1114            Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1115                ProviderError::BlockBodyIndicesNotFound(5)
1116            )))
1117        );
1118    }
1119}