reth_stages_api/pipeline/
mod.rs

1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, BlockNumReader,
11    ChainStateBlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory,
12    StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18    pin::Pin,
19    time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29    BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30    StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37/// A container for a queued stage.
38pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40/// The future that returns the owned pipeline and the result of the pipeline run. See
41/// [`Pipeline::run_as_fut`].
42pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44/// The pipeline type itself with the result of [`Pipeline::run_as_fut`]
45pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48/// A staged sync pipeline.
49///
50/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
51/// of the chain and the pipeline then executes each stage in order from the current local chain tip
52/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
53/// tip.
54///
55/// After the entire pipeline has been run, it will run again unless asked to stop (see
56/// [`Pipeline::set_max_block`]).
57///
58/// `include_mmd!("docs/mermaid/pipeline.mmd`")
59///
60/// # Unwinding
61///
62/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
63/// pipeline will unwind the stages in reverse order of execution. It is also possible to
64/// request an unwind manually (see [`Pipeline::unwind`]).
65///
66/// # Defaults
67///
68/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
69pub struct Pipeline<N: ProviderNodeTypes> {
70    /// Provider factory.
71    provider_factory: ProviderFactory<N>,
72    /// All configured stages in the order they will be executed.
73    stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74    /// The maximum block number to sync to.
75    max_block: Option<BlockNumber>,
76    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77    /// Sender for events the pipeline emits.
78    event_sender: EventSender<PipelineEvent>,
79    /// Keeps track of the progress of the pipeline.
80    progress: PipelineProgress,
81    /// A Sender for the current chain tip to sync to.
82    ///
83    /// This is used to notify the headers stage about a new sync target.
84    tip_tx: Option<watch::Sender<B256>>,
85    metrics_tx: Option<MetricEventsSender>,
86    /// Whether an unwind should fail the syncing process. Should only be set when downloading
87    /// blocks from trusted sources and expecting them to be valid.
88    fail_on_unwind: bool,
89    /// Block that was chosen as a target of the last unwind triggered by
90    /// [`StageError::DetachedHead`] error.
91    last_detached_head_unwind_target: Option<B256>,
92    /// Number of consecutive unwind attempts due to [`StageError::DetachedHead`] for the current
93    /// fork.
94    detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98    /// Construct a pipeline using a [`PipelineBuilder`].
99    pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100    {
101        PipelineBuilder::default()
102    }
103
104    /// Return the minimum block number achieved by
105    /// any stage during the execution of the pipeline.
106    pub const fn minimum_block_number(&self) -> Option<u64> {
107        self.progress.minimum_block_number
108    }
109
110    /// Set tip for reverse sync.
111    #[track_caller]
112    pub fn set_tip(&self, tip: B256) {
113        let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114            warn!(target: "sync::pipeline", "Chain tip channel closed");
115        });
116    }
117
118    /// Listen for events on the pipeline.
119    pub fn events(&self) -> EventStream<PipelineEvent> {
120        self.event_sender.new_listener()
121    }
122
123    /// Get a mutable reference to a stage by index.
124    pub fn stage(
125        &mut self,
126        idx: usize,
127    ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128        &mut self.stages[idx]
129    }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133    /// Registers progress metrics for each registered stage
134    pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135        let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136        let provider = self.provider_factory.provider()?;
137
138        for stage in &self.stages {
139            let stage_id = stage.id();
140            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141                stage_id,
142                checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143                max_block_number: None,
144                elapsed: Duration::default(),
145            });
146        }
147        Ok(())
148    }
149
150    /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
151    /// pipeline and its result as a future.
152    #[track_caller]
153    pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154        let _ = self.register_metrics();
155        Box::pin(async move {
156            // NOTE: the tip should only be None if we are in continuous sync mode.
157            if let Some(target) = target {
158                match target {
159                    PipelineTarget::Sync(tip) => self.set_tip(tip),
160                    PipelineTarget::Unwind(target) => {
161                        if let Err(err) = self.move_to_static_files() {
162                            return (self, Err(err.into()))
163                        }
164                        if let Err(err) = self.unwind(target, None) {
165                            return (self, Err(err))
166                        }
167                        self.progress.update(target);
168
169                        return (self, Ok(ControlFlow::Continue { block_number: target }))
170                    }
171                }
172            }
173
174            let result = self.run_loop().await;
175            trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176            (self, result)
177        })
178    }
179
180    /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
181    /// a `max_block` in the pipeline.
182    pub async fn run(&mut self) -> Result<(), PipelineError> {
183        let _ = self.register_metrics(); // ignore error
184
185        loop {
186            let next_action = self.run_loop().await?;
187
188            if next_action.is_unwind() && self.fail_on_unwind {
189                return Err(PipelineError::UnexpectedUnwind)
190            }
191
192            // Terminate the loop early if it's reached the maximum user
193            // configured block.
194            if next_action.should_continue() &&
195                self.progress
196                    .minimum_block_number
197                    .zip(self.max_block)
198                    .is_some_and(|(progress, target)| progress >= target)
199            {
200                trace!(
201                    target: "sync::pipeline",
202                    ?next_action,
203                    minimum_block_number = ?self.progress.minimum_block_number,
204                    max_block = ?self.max_block,
205                    "Terminating pipeline."
206                );
207                return Ok(())
208            }
209        }
210    }
211
212    /// Performs one pass of the pipeline across all stages. After successful
213    /// execution of each stage, it proceeds to commit it to the database.
214    ///
215    /// If any stage is unsuccessful at execution, we proceed to
216    /// unwind. This will undo the progress across the entire pipeline
217    /// up to the block that caused the error.
218    ///
219    /// Returns the control flow after it ran the pipeline.
220    /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in
221    /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage
222    /// that caused the unwind.
223    pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224        self.move_to_static_files()?;
225
226        let mut previous_stage = None;
227        for stage_index in 0..self.stages.len() {
228            let stage = &self.stages[stage_index];
229            let stage_id = stage.id();
230
231            trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232            let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234            trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236            match next {
237                ControlFlow::NoProgress { block_number } => {
238                    if let Some(block_number) = block_number {
239                        self.progress.update(block_number);
240                    }
241                }
242                ControlFlow::Continue { block_number } => self.progress.update(block_number),
243                ControlFlow::Unwind { target, bad_block } => {
244                    self.unwind(target, Some(bad_block.block.number))?;
245                    return Ok(ControlFlow::Unwind { target, bad_block })
246                }
247            }
248
249            previous_stage = Some(
250                self.provider_factory
251                    .provider()?
252                    .get_stage_checkpoint(stage_id)?
253                    .unwrap_or_default()
254                    .block_number,
255            );
256        }
257
258        Ok(self.progress.next_ctrl())
259    }
260
261    /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
262    /// all data from the database to static files for corresponding
263    /// [segments](reth_static_file_types::StaticFileSegment), according to their [stage
264    /// checkpoints](StageCheckpoint):
265    /// - [`StaticFileSegment::Headers`](reth_static_file_types::StaticFileSegment::Headers) ->
266    ///   [`StageId::Headers`]
267    /// - [`StaticFileSegment::Receipts`](reth_static_file_types::StaticFileSegment::Receipts) ->
268    ///   [`StageId::Execution`]
269    /// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
270    ///   -> [`StageId::Bodies`]
271    ///
272    /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
273    /// lock is occupied.
274    pub fn move_to_static_files(&self) -> RethResult<()> {
275        // Copies data from database to static files
276        let lowest_static_file_height =
277            self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279        // Deletes data which has been copied to static files.
280        if let Some(prune_tip) = lowest_static_file_height {
281            // Run the pruner so we don't potentially end up with higher height in the database vs
282            // static files during a pipeline unwind
283            let mut pruner = PrunerBuilder::new(Default::default())
284                .delete_limit(usize::MAX)
285                .build_with_provider_factory(self.provider_factory.clone());
286
287            pruner.run(prune_tip)?;
288        }
289
290        Ok(())
291    }
292
293    /// Unwind the stages to the target block (exclusive).
294    ///
295    /// If the unwind is due to a bad block the number of that block should be specified.
296    pub fn unwind(
297        &mut self,
298        to: BlockNumber,
299        bad_block: Option<BlockNumber>,
300    ) -> Result<(), PipelineError> {
301        // Add validation before starting unwind
302        let provider = self.provider_factory.provider()?;
303        let latest_block = provider.last_block_number()?;
304
305        // Get the actual pruning configuration
306        let prune_modes = provider.prune_modes_ref();
307
308        prune_modes.ensure_unwind_target_unpruned(latest_block, to)?;
309
310        // Unwind stages in reverse order of execution
311        let unwind_pipeline = self.stages.iter_mut().rev();
312
313        // Legacy Engine: This prevents a race condition in which the `StaticFileProducer` could
314        // attempt to proceed with a finalized block which has been unwinded
315        let _locked_sf_producer = self.static_file_producer.lock();
316
317        let mut provider_rw = self.provider_factory.database_provider_rw()?;
318
319        for stage in unwind_pipeline {
320            let stage_id = stage.id();
321            let span = info_span!("Unwinding", stage = %stage_id);
322            let _enter = span.enter();
323
324            let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
325            if checkpoint.block_number < to {
326                debug!(
327                    target: "sync::pipeline",
328                    from = %checkpoint.block_number,
329                    %to,
330                    "Unwind point too far for stage"
331                );
332                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
333
334                continue
335            }
336
337            info!(
338                target: "sync::pipeline",
339                from = %checkpoint.block_number,
340                %to,
341                ?bad_block,
342                "Starting unwind"
343            );
344            while checkpoint.block_number > to {
345                let unwind_started_at = Instant::now();
346                let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
347                self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
348
349                let output = stage.unwind(&provider_rw, input);
350                match output {
351                    Ok(unwind_output) => {
352                        checkpoint = unwind_output.checkpoint;
353                        info!(
354                            target: "sync::pipeline",
355                            stage = %stage_id,
356                            unwind_to = to,
357                            progress = checkpoint.block_number,
358                            done = checkpoint.block_number == to,
359                            "Stage unwound"
360                        );
361
362                        provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
363
364                        // Notify event listeners and update metrics.
365                        self.event_sender
366                            .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
367
368                        if let Some(metrics_tx) = &mut self.metrics_tx {
369                            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
370                                stage_id,
371                                checkpoint,
372                                // We assume it was set in the previous execute iteration, so it
373                                // doesn't change when we unwind.
374                                max_block_number: None,
375                                elapsed: unwind_started_at.elapsed(),
376                            });
377                        }
378
379                        // update finalized block if needed
380                        let last_saved_finalized_block_number =
381                            provider_rw.last_finalized_block_number()?;
382
383                        // If None, that means the finalized block is not written so we should
384                        // always save in that case
385                        if last_saved_finalized_block_number.is_none() ||
386                            Some(checkpoint.block_number) < last_saved_finalized_block_number
387                        {
388                            provider_rw.save_finalized_block_number(BlockNumber::from(
389                                checkpoint.block_number,
390                            ))?;
391                        }
392
393                        UnifiedStorageWriter::commit_unwind(provider_rw)?;
394
395                        stage.post_unwind_commit()?;
396
397                        provider_rw = self.provider_factory.database_provider_rw()?;
398                    }
399                    Err(err) => {
400                        self.event_sender.notify(PipelineEvent::Error { stage_id });
401
402                        return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
403                    }
404                }
405            }
406        }
407
408        Ok(())
409    }
410
411    async fn execute_stage_to_completion(
412        &mut self,
413        previous_stage: Option<BlockNumber>,
414        stage_index: usize,
415    ) -> Result<ControlFlow, PipelineError> {
416        let total_stages = self.stages.len();
417
418        let stage_id = self.stage(stage_index).id();
419        let mut made_progress = false;
420        let target = self.max_block.or(previous_stage);
421
422        loop {
423            let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
424
425            let stage_reached_max_block = prev_checkpoint
426                .zip(self.max_block)
427                .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
428            if stage_reached_max_block {
429                warn!(
430                    target: "sync::pipeline",
431                    stage = %stage_id,
432                    max_block = self.max_block,
433                    prev_block = prev_checkpoint.map(|progress| progress.block_number),
434                    "Stage reached target block, skipping."
435                );
436                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
437
438                // We reached the maximum block, so we skip the stage
439                return Ok(ControlFlow::NoProgress {
440                    block_number: prev_checkpoint.map(|progress| progress.block_number),
441                })
442            }
443
444            let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
445
446            self.event_sender.notify(PipelineEvent::Prepare {
447                pipeline_stages_progress: PipelineStagesProgress {
448                    current: stage_index + 1,
449                    total: total_stages,
450                },
451                stage_id,
452                checkpoint: prev_checkpoint,
453                target,
454            });
455
456            if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
457                self.event_sender.notify(PipelineEvent::Error { stage_id });
458                match self.on_stage_error(stage_id, prev_checkpoint, err)? {
459                    Some(ctrl) => return Ok(ctrl),
460                    None => continue,
461                };
462            }
463
464            let stage_started_at = Instant::now();
465            let provider_rw = self.provider_factory.database_provider_rw()?;
466
467            self.event_sender.notify(PipelineEvent::Run {
468                pipeline_stages_progress: PipelineStagesProgress {
469                    current: stage_index + 1,
470                    total: total_stages,
471                },
472                stage_id,
473                checkpoint: prev_checkpoint,
474                target,
475            });
476
477            match self.stage(stage_index).execute(&provider_rw, exec_input) {
478                Ok(out @ ExecOutput { checkpoint, done }) => {
479                    // Update stage checkpoint.
480                    provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
481
482                    // Commit processed data to the database.
483                    UnifiedStorageWriter::commit(provider_rw)?;
484
485                    // Invoke stage post commit hook.
486                    self.stage(stage_index).post_execute_commit()?;
487
488                    // Notify event listeners and update metrics.
489                    self.event_sender.notify(PipelineEvent::Ran {
490                        pipeline_stages_progress: PipelineStagesProgress {
491                            current: stage_index + 1,
492                            total: total_stages,
493                        },
494                        stage_id,
495                        result: out.clone(),
496                    });
497                    if let Some(metrics_tx) = &mut self.metrics_tx {
498                        let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
499                            stage_id,
500                            checkpoint,
501                            max_block_number: target,
502                            elapsed: stage_started_at.elapsed(),
503                        });
504                    }
505
506                    let block_number = checkpoint.block_number;
507                    let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
508                    made_progress |= block_number != prev_block_number;
509                    if done {
510                        return Ok(if made_progress {
511                            ControlFlow::Continue { block_number }
512                        } else {
513                            ControlFlow::NoProgress { block_number: Some(block_number) }
514                        })
515                    }
516                }
517                Err(err) => {
518                    drop(provider_rw);
519                    self.event_sender.notify(PipelineEvent::Error { stage_id });
520
521                    if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
522                        return Ok(ctrl)
523                    }
524                }
525            }
526        }
527    }
528
529    fn on_stage_error(
530        &mut self,
531        stage_id: StageId,
532        prev_checkpoint: Option<StageCheckpoint>,
533        err: StageError,
534    ) -> Result<Option<ControlFlow>, PipelineError> {
535        if let StageError::DetachedHead { local_head, header, error } = err {
536            warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
537
538            if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
539                if local_head.block.hash == last_detached_head_unwind_target &&
540                    header.block.number == local_head.block.number + 1
541                {
542                    self.detached_head_attempts += 1;
543                } else {
544                    self.detached_head_attempts = 1;
545                }
546            } else {
547                self.detached_head_attempts = 1;
548            }
549
550            // We unwind because of a detached head.
551            let unwind_to = local_head
552                .block
553                .number
554                .saturating_sub(
555                    BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
556                )
557                .max(1);
558
559            self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
560            Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
561        } else if let StageError::Block { block, error } = err {
562            match error {
563                BlockErrorKind::Validation(validation_error) => {
564                    error!(
565                        target: "sync::pipeline",
566                        stage = %stage_id,
567                        bad_block = %block.block.number,
568                        "Stage encountered a validation error: {validation_error}"
569                    );
570
571                    // FIXME: When handling errors, we do not commit the database transaction. This
572                    // leads to the Merkle stage not clearing its checkpoint, and restarting from an
573                    // invalid place.
574                    let provider_rw = self.provider_factory.database_provider_rw()?;
575                    provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
576                    provider_rw.save_stage_checkpoint(
577                        StageId::MerkleExecute,
578                        prev_checkpoint.unwrap_or_default(),
579                    )?;
580
581                    UnifiedStorageWriter::commit(provider_rw)?;
582
583                    // We unwind because of a validation error. If the unwind itself
584                    // fails, we bail entirely,
585                    // otherwise we restart the execution loop from the
586                    // beginning.
587                    Ok(Some(ControlFlow::Unwind {
588                        target: prev_checkpoint.unwrap_or_default().block_number,
589                        bad_block: block,
590                    }))
591                }
592                BlockErrorKind::Execution(execution_error) => {
593                    error!(
594                        target: "sync::pipeline",
595                        stage = %stage_id,
596                        bad_block = %block.block.number,
597                        "Stage encountered an execution error: {execution_error}"
598                    );
599
600                    // We unwind because of an execution error. If the unwind itself
601                    // fails, we bail entirely,
602                    // otherwise we restart
603                    // the execution loop from the beginning.
604                    Ok(Some(ControlFlow::Unwind {
605                        target: prev_checkpoint.unwrap_or_default().block_number,
606                        bad_block: block,
607                    }))
608                }
609            }
610        } else if let StageError::MissingStaticFileData { block, segment } = err {
611            error!(
612                target: "sync::pipeline",
613                stage = %stage_id,
614                bad_block = %block.block.number,
615                segment = %segment,
616                "Stage is missing static file data."
617            );
618
619            Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
620        } else if err.is_fatal() {
621            error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
622            Err(err.into())
623        } else {
624            // On other errors we assume they are recoverable if we discard the
625            // transaction and run the stage again.
626            warn!(
627                target: "sync::pipeline",
628                stage = %stage_id,
629                "Stage encountered a non-fatal error: {err}. Retrying..."
630            );
631            Ok(None)
632        }
633    }
634}
635
636impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
637    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638        f.debug_struct("Pipeline")
639            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
640            .field("max_block", &self.max_block)
641            .field("event_sender", &self.event_sender)
642            .field("fail_on_unwind", &self.fail_on_unwind)
643            .finish()
644    }
645}
646
647#[cfg(test)]
648mod tests {
649    use std::sync::atomic::Ordering;
650
651    use super::*;
652    use crate::{test_utils::TestStage, UnwindOutput};
653    use assert_matches::assert_matches;
654    use reth_consensus::ConsensusError;
655    use reth_errors::ProviderError;
656    use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
657    use reth_prune::PruneModes;
658    use reth_testing_utils::generators::{self, random_block_with_parent};
659    use tokio_stream::StreamExt;
660
661    #[test]
662    fn record_progress_calculates_outliers() {
663        let mut progress = PipelineProgress::default();
664
665        progress.update(10);
666        assert_eq!(progress.minimum_block_number, Some(10));
667        assert_eq!(progress.maximum_block_number, Some(10));
668
669        progress.update(20);
670        assert_eq!(progress.minimum_block_number, Some(10));
671        assert_eq!(progress.maximum_block_number, Some(20));
672
673        progress.update(1);
674        assert_eq!(progress.minimum_block_number, Some(1));
675        assert_eq!(progress.maximum_block_number, Some(20));
676    }
677
678    #[test]
679    fn progress_ctrl_flow() {
680        let mut progress = PipelineProgress::default();
681
682        assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
683
684        progress.update(1);
685        assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
686    }
687
688    /// Runs a simple pipeline.
689    #[tokio::test]
690    async fn run_pipeline() {
691        let provider_factory = create_test_provider_factory();
692
693        let stage_a = TestStage::new(StageId::Other("A"))
694            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
695        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
696        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
697
698        let stage_b = TestStage::new(StageId::Other("B"))
699            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
700        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
701        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
702
703        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
704            .add_stage(stage_a)
705            .add_stage(stage_b)
706            .with_max_block(10)
707            .build(
708                provider_factory.clone(),
709                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
710            );
711        let events = pipeline.events();
712
713        // Run pipeline
714        tokio::spawn(async move {
715            pipeline.run().await.unwrap();
716        });
717
718        // Check that the stages were run in order
719        assert_eq!(
720            events.collect::<Vec<PipelineEvent>>().await,
721            vec![
722                PipelineEvent::Prepare {
723                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
724                    stage_id: StageId::Other("A"),
725                    checkpoint: None,
726                    target: Some(10),
727                },
728                PipelineEvent::Run {
729                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
730                    stage_id: StageId::Other("A"),
731                    checkpoint: None,
732                    target: Some(10),
733                },
734                PipelineEvent::Ran {
735                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
736                    stage_id: StageId::Other("A"),
737                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
738                },
739                PipelineEvent::Prepare {
740                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
741                    stage_id: StageId::Other("B"),
742                    checkpoint: None,
743                    target: Some(10),
744                },
745                PipelineEvent::Run {
746                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
747                    stage_id: StageId::Other("B"),
748                    checkpoint: None,
749                    target: Some(10),
750                },
751                PipelineEvent::Ran {
752                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
753                    stage_id: StageId::Other("B"),
754                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
755                },
756            ]
757        );
758
759        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
760        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
761
762        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
763        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
764    }
765
766    /// Unwinds a simple pipeline.
767    #[tokio::test]
768    async fn unwind_pipeline() {
769        let provider_factory = create_test_provider_factory();
770
771        let stage_a = TestStage::new(StageId::Other("A"))
772            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
773            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
774        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
775        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
776
777        let stage_b = TestStage::new(StageId::Other("B"))
778            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
779            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
780        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
781        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
782
783        let stage_c = TestStage::new(StageId::Other("C"))
784            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
785            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
786        let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
787        let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
788
789        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
790            .add_stage(stage_a)
791            .add_stage(stage_b)
792            .add_stage(stage_c)
793            .with_max_block(10)
794            .build(
795                provider_factory.clone(),
796                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
797            );
798        let events = pipeline.events();
799
800        // Run pipeline
801        tokio::spawn(async move {
802            // Sync first
803            pipeline.run().await.expect("Could not run pipeline");
804
805            // Unwind
806            pipeline.unwind(1, None).expect("Could not unwind pipeline");
807        });
808
809        // Check that the stages were unwound in reverse order
810        assert_eq!(
811            events.collect::<Vec<PipelineEvent>>().await,
812            vec![
813                // Executing
814                PipelineEvent::Prepare {
815                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
816                    stage_id: StageId::Other("A"),
817                    checkpoint: None,
818                    target: Some(10),
819                },
820                PipelineEvent::Run {
821                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
822                    stage_id: StageId::Other("A"),
823                    checkpoint: None,
824                    target: Some(10),
825                },
826                PipelineEvent::Ran {
827                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
828                    stage_id: StageId::Other("A"),
829                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
830                },
831                PipelineEvent::Prepare {
832                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
833                    stage_id: StageId::Other("B"),
834                    checkpoint: None,
835                    target: Some(10),
836                },
837                PipelineEvent::Run {
838                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
839                    stage_id: StageId::Other("B"),
840                    checkpoint: None,
841                    target: Some(10),
842                },
843                PipelineEvent::Ran {
844                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
845                    stage_id: StageId::Other("B"),
846                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
847                },
848                PipelineEvent::Prepare {
849                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
850                    stage_id: StageId::Other("C"),
851                    checkpoint: None,
852                    target: Some(10),
853                },
854                PipelineEvent::Run {
855                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
856                    stage_id: StageId::Other("C"),
857                    checkpoint: None,
858                    target: Some(10),
859                },
860                PipelineEvent::Ran {
861                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
862                    stage_id: StageId::Other("C"),
863                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
864                },
865                // Unwinding
866                PipelineEvent::Unwind {
867                    stage_id: StageId::Other("C"),
868                    input: UnwindInput {
869                        checkpoint: StageCheckpoint::new(20),
870                        unwind_to: 1,
871                        bad_block: None
872                    }
873                },
874                PipelineEvent::Unwound {
875                    stage_id: StageId::Other("C"),
876                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
877                },
878                PipelineEvent::Unwind {
879                    stage_id: StageId::Other("B"),
880                    input: UnwindInput {
881                        checkpoint: StageCheckpoint::new(10),
882                        unwind_to: 1,
883                        bad_block: None
884                    }
885                },
886                PipelineEvent::Unwound {
887                    stage_id: StageId::Other("B"),
888                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
889                },
890                PipelineEvent::Unwind {
891                    stage_id: StageId::Other("A"),
892                    input: UnwindInput {
893                        checkpoint: StageCheckpoint::new(100),
894                        unwind_to: 1,
895                        bad_block: None
896                    }
897                },
898                PipelineEvent::Unwound {
899                    stage_id: StageId::Other("A"),
900                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
901                },
902            ]
903        );
904
905        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
906        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
907
908        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
909        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
910
911        assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
912        assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
913    }
914
915    /// Unwinds a pipeline with intermediate progress.
916    #[tokio::test]
917    async fn unwind_pipeline_with_intermediate_progress() {
918        let provider_factory = create_test_provider_factory();
919
920        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
921            .add_stage(
922                TestStage::new(StageId::Other("A"))
923                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
924                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
925            )
926            .add_stage(
927                TestStage::new(StageId::Other("B"))
928                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
929            )
930            .with_max_block(10)
931            .build(
932                provider_factory.clone(),
933                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
934            );
935        let events = pipeline.events();
936
937        // Run pipeline
938        tokio::spawn(async move {
939            // Sync first
940            pipeline.run().await.expect("Could not run pipeline");
941
942            // Unwind
943            pipeline.unwind(50, None).expect("Could not unwind pipeline");
944        });
945
946        // Check that the stages were unwound in reverse order
947        assert_eq!(
948            events.collect::<Vec<PipelineEvent>>().await,
949            vec![
950                // Executing
951                PipelineEvent::Prepare {
952                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
953                    stage_id: StageId::Other("A"),
954                    checkpoint: None,
955                    target: Some(10),
956                },
957                PipelineEvent::Run {
958                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
959                    stage_id: StageId::Other("A"),
960                    checkpoint: None,
961                    target: Some(10),
962                },
963                PipelineEvent::Ran {
964                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
965                    stage_id: StageId::Other("A"),
966                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
967                },
968                PipelineEvent::Prepare {
969                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
970                    stage_id: StageId::Other("B"),
971                    checkpoint: None,
972                    target: Some(10),
973                },
974                PipelineEvent::Run {
975                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
976                    stage_id: StageId::Other("B"),
977                    checkpoint: None,
978                    target: Some(10),
979                },
980                PipelineEvent::Ran {
981                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
982                    stage_id: StageId::Other("B"),
983                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
984                },
985                // Unwinding
986                // Nothing to unwind in stage "B"
987                PipelineEvent::Skipped { stage_id: StageId::Other("B") },
988                PipelineEvent::Unwind {
989                    stage_id: StageId::Other("A"),
990                    input: UnwindInput {
991                        checkpoint: StageCheckpoint::new(100),
992                        unwind_to: 50,
993                        bad_block: None
994                    }
995                },
996                PipelineEvent::Unwound {
997                    stage_id: StageId::Other("A"),
998                    result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
999                },
1000            ]
1001        );
1002    }
1003
1004    /// Runs a pipeline that unwinds during sync.
1005    ///
1006    /// The flow is:
1007    ///
1008    /// - Stage A syncs to block 10
1009    /// - Stage B triggers an unwind, marking block 5 as bad
1010    /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is
1011    ///   skipped entirely (there is nothing to unwind)
1012    /// - Stage A unwinds to its previous progress, block 0
1013    /// - Stage A syncs back up to block 10
1014    /// - Stage B syncs to block 10
1015    /// - The pipeline finishes
1016    #[tokio::test]
1017    async fn run_pipeline_with_unwind() {
1018        let provider_factory = create_test_provider_factory();
1019
1020        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1021            .add_stage(
1022                TestStage::new(StageId::Other("A"))
1023                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1024                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1025                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1026            )
1027            .add_stage(
1028                TestStage::new(StageId::Other("B"))
1029                    .add_exec(Err(StageError::Block {
1030                        block: Box::new(random_block_with_parent(
1031                            &mut generators::rng(),
1032                            5,
1033                            Default::default(),
1034                        )),
1035                        error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1036                    }))
1037                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1038                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1039            )
1040            .with_max_block(10)
1041            .build(
1042                provider_factory.clone(),
1043                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1044            );
1045        let events = pipeline.events();
1046
1047        // Run pipeline
1048        tokio::spawn(async move {
1049            pipeline.run().await.expect("Could not run pipeline");
1050        });
1051
1052        // Check that the stages were unwound in reverse order
1053        assert_eq!(
1054            events.collect::<Vec<PipelineEvent>>().await,
1055            vec![
1056                PipelineEvent::Prepare {
1057                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1058                    stage_id: StageId::Other("A"),
1059                    checkpoint: None,
1060                    target: Some(10),
1061                },
1062                PipelineEvent::Run {
1063                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1064                    stage_id: StageId::Other("A"),
1065                    checkpoint: None,
1066                    target: Some(10),
1067                },
1068                PipelineEvent::Ran {
1069                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1070                    stage_id: StageId::Other("A"),
1071                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1072                },
1073                PipelineEvent::Prepare {
1074                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1075                    stage_id: StageId::Other("B"),
1076                    checkpoint: None,
1077                    target: Some(10),
1078                },
1079                PipelineEvent::Run {
1080                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1081                    stage_id: StageId::Other("B"),
1082                    checkpoint: None,
1083                    target: Some(10),
1084                },
1085                PipelineEvent::Error { stage_id: StageId::Other("B") },
1086                PipelineEvent::Unwind {
1087                    stage_id: StageId::Other("A"),
1088                    input: UnwindInput {
1089                        checkpoint: StageCheckpoint::new(10),
1090                        unwind_to: 0,
1091                        bad_block: Some(5)
1092                    }
1093                },
1094                PipelineEvent::Unwound {
1095                    stage_id: StageId::Other("A"),
1096                    result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1097                },
1098                PipelineEvent::Prepare {
1099                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1100                    stage_id: StageId::Other("A"),
1101                    checkpoint: Some(StageCheckpoint::new(0)),
1102                    target: Some(10),
1103                },
1104                PipelineEvent::Run {
1105                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1106                    stage_id: StageId::Other("A"),
1107                    checkpoint: Some(StageCheckpoint::new(0)),
1108                    target: Some(10),
1109                },
1110                PipelineEvent::Ran {
1111                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1112                    stage_id: StageId::Other("A"),
1113                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1114                },
1115                PipelineEvent::Prepare {
1116                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1117                    stage_id: StageId::Other("B"),
1118                    checkpoint: None,
1119                    target: Some(10),
1120                },
1121                PipelineEvent::Run {
1122                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1123                    stage_id: StageId::Other("B"),
1124                    checkpoint: None,
1125                    target: Some(10),
1126                },
1127                PipelineEvent::Ran {
1128                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1129                    stage_id: StageId::Other("B"),
1130                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1131                },
1132            ]
1133        );
1134    }
1135
1136    /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
1137    #[tokio::test]
1138    async fn pipeline_error_handling() {
1139        // Non-fatal
1140        let provider_factory = create_test_provider_factory();
1141        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1142            .add_stage(
1143                TestStage::new(StageId::Other("NonFatal"))
1144                    .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1145                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1146            )
1147            .with_max_block(10)
1148            .build(
1149                provider_factory.clone(),
1150                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1151            );
1152        let result = pipeline.run().await;
1153        assert_matches!(result, Ok(()));
1154
1155        // Fatal
1156        let provider_factory = create_test_provider_factory();
1157        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1158            .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1159                StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1160            )))
1161            .build(
1162                provider_factory.clone(),
1163                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1164            );
1165        let result = pipeline.run().await;
1166        assert_matches!(
1167            result,
1168            Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1169                ProviderError::BlockBodyIndicesNotFound(5)
1170            )))
1171        );
1172    }
1173}