reth_stages_api/pipeline/
mod.rs

1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10    providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11    ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12    PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18    pin::Pin,
19    time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29    BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30    StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37/// A container for a queued stage.
38pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40/// The future that returns the owned pipeline and the result of the pipeline run. See
41/// [`Pipeline::run_as_fut`].
42pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44/// The pipeline type itself with the result of [`Pipeline::run_as_fut`]
45pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48/// A staged sync pipeline.
49///
50/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
51/// of the chain and the pipeline then executes each stage in order from the current local chain tip
52/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
53/// tip.
54///
55/// After the entire pipeline has been run, it will run again unless asked to stop (see
56/// [`Pipeline::set_max_block`]).
57///
58/// `include_mmd!("docs/mermaid/pipeline.mmd`")
59///
60/// # Unwinding
61///
62/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
63/// pipeline will unwind the stages in reverse order of execution. It is also possible to
64/// request an unwind manually (see [`Pipeline::unwind`]).
65///
66/// # Defaults
67///
68/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
69pub struct Pipeline<N: ProviderNodeTypes> {
70    /// Provider factory.
71    provider_factory: ProviderFactory<N>,
72    /// All configured stages in the order they will be executed.
73    stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74    /// The maximum block number to sync to.
75    max_block: Option<BlockNumber>,
76    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77    /// Sender for events the pipeline emits.
78    event_sender: EventSender<PipelineEvent>,
79    /// Keeps track of the progress of the pipeline.
80    progress: PipelineProgress,
81    /// A Sender for the current chain tip to sync to.
82    ///
83    /// This is used to notify the headers stage about a new sync target.
84    tip_tx: Option<watch::Sender<B256>>,
85    metrics_tx: Option<MetricEventsSender>,
86    /// Whether an unwind should fail the syncing process. Should only be set when downloading
87    /// blocks from trusted sources and expecting them to be valid.
88    fail_on_unwind: bool,
89    /// Block that was chosen as a target of the last unwind triggered by
90    /// [`StageError::DetachedHead`] error.
91    last_detached_head_unwind_target: Option<B256>,
92    /// Number of consecutive unwind attempts due to [`StageError::DetachedHead`] for the current
93    /// fork.
94    detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98    /// Construct a pipeline using a [`PipelineBuilder`].
99    pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100    {
101        PipelineBuilder::default()
102    }
103
104    /// Return the minimum block number achieved by
105    /// any stage during the execution of the pipeline.
106    pub const fn minimum_block_number(&self) -> Option<u64> {
107        self.progress.minimum_block_number
108    }
109
110    /// Set tip for reverse sync.
111    #[track_caller]
112    pub fn set_tip(&self, tip: B256) {
113        let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114            warn!(target: "sync::pipeline", "Chain tip channel closed");
115        });
116    }
117
118    /// Listen for events on the pipeline.
119    pub fn events(&self) -> EventStream<PipelineEvent> {
120        self.event_sender.new_listener()
121    }
122
123    /// Get a mutable reference to a stage by index.
124    pub fn stage(
125        &mut self,
126        idx: usize,
127    ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128        &mut self.stages[idx]
129    }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133    /// Registers progress metrics for each registered stage
134    pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135        let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136        let provider = self.provider_factory.provider()?;
137
138        for stage in &self.stages {
139            let stage_id = stage.id();
140            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141                stage_id,
142                checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143                max_block_number: None,
144                elapsed: Duration::default(),
145            });
146        }
147        Ok(())
148    }
149
150    /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
151    /// pipeline and its result as a future.
152    #[track_caller]
153    pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154        let _ = self.register_metrics();
155        Box::pin(async move {
156            // NOTE: the tip should only be None if we are in continuous sync mode.
157            if let Some(target) = target {
158                match target {
159                    PipelineTarget::Sync(tip) => self.set_tip(tip),
160                    PipelineTarget::Unwind(target) => {
161                        if let Err(err) = self.move_to_static_files() {
162                            return (self, Err(err.into()))
163                        }
164                        if let Err(err) = self.unwind(target, None) {
165                            return (self, Err(err))
166                        }
167                        self.progress.update(target);
168
169                        return (self, Ok(ControlFlow::Continue { block_number: target }))
170                    }
171                }
172            }
173
174            let result = self.run_loop().await;
175            trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176            (self, result)
177        })
178    }
179
180    /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
181    /// a `max_block` in the pipeline.
182    pub async fn run(&mut self) -> Result<(), PipelineError> {
183        let _ = self.register_metrics(); // ignore error
184
185        loop {
186            let next_action = self.run_loop().await?;
187
188            if next_action.is_unwind() && self.fail_on_unwind {
189                return Err(PipelineError::UnexpectedUnwind)
190            }
191
192            // Terminate the loop early if it's reached the maximum user
193            // configured block.
194            if next_action.should_continue() &&
195                self.progress
196                    .minimum_block_number
197                    .zip(self.max_block)
198                    .is_some_and(|(progress, target)| progress >= target)
199            {
200                trace!(
201                    target: "sync::pipeline",
202                    ?next_action,
203                    minimum_block_number = ?self.progress.minimum_block_number,
204                    max_block = ?self.max_block,
205                    "Terminating pipeline."
206                );
207                return Ok(())
208            }
209        }
210    }
211
212    /// Performs one pass of the pipeline across all stages. After successful
213    /// execution of each stage, it proceeds to commit it to the database.
214    ///
215    /// If any stage is unsuccessful at execution, we proceed to
216    /// unwind. This will undo the progress across the entire pipeline
217    /// up to the block that caused the error.
218    ///
219    /// Returns the control flow after it ran the pipeline.
220    /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in
221    /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage
222    /// that caused the unwind.
223    pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224        self.move_to_static_files()?;
225
226        let mut previous_stage = None;
227        for stage_index in 0..self.stages.len() {
228            let stage = &self.stages[stage_index];
229            let stage_id = stage.id();
230
231            trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232            let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234            trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236            match next {
237                ControlFlow::NoProgress { block_number } => {
238                    if let Some(block_number) = block_number {
239                        self.progress.update(block_number);
240                    }
241                }
242                ControlFlow::Continue { block_number } => self.progress.update(block_number),
243                ControlFlow::Unwind { target, bad_block } => {
244                    self.unwind(target, Some(bad_block.block.number))?;
245                    return Ok(ControlFlow::Unwind { target, bad_block })
246                }
247            }
248
249            previous_stage = Some(
250                self.provider_factory
251                    .provider()?
252                    .get_stage_checkpoint(stage_id)?
253                    .unwrap_or_default()
254                    .block_number,
255            );
256        }
257
258        Ok(self.progress.next_ctrl())
259    }
260
261    /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
262    /// all data from the database to static files for corresponding
263    /// [segments](reth_static_file_types::StaticFileSegment), according to their [stage
264    /// checkpoints](StageCheckpoint):
265    /// - [`StaticFileSegment::Headers`](reth_static_file_types::StaticFileSegment::Headers) ->
266    ///   [`StageId::Headers`]
267    /// - [`StaticFileSegment::Receipts`](reth_static_file_types::StaticFileSegment::Receipts) ->
268    ///   [`StageId::Execution`]
269    /// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
270    ///   -> [`StageId::Bodies`]
271    ///
272    /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
273    /// lock is occupied.
274    pub fn move_to_static_files(&self) -> RethResult<()> {
275        // Copies data from database to static files
276        let lowest_static_file_height =
277            self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279        // Deletes data which has been copied to static files.
280        if let Some(prune_tip) = lowest_static_file_height {
281            // Run the pruner so we don't potentially end up with higher height in the database vs
282            // static files during a pipeline unwind
283            let mut pruner = PrunerBuilder::new(Default::default())
284                .delete_limit(usize::MAX)
285                .build_with_provider_factory(self.provider_factory.clone());
286
287            pruner.run(prune_tip)?;
288        }
289
290        Ok(())
291    }
292
293    /// Unwind the stages to the target block (exclusive).
294    ///
295    /// If the unwind is due to a bad block the number of that block should be specified.
296    pub fn unwind(
297        &mut self,
298        to: BlockNumber,
299        bad_block: Option<BlockNumber>,
300    ) -> Result<(), PipelineError> {
301        // Add validation before starting unwind
302        let provider = self.provider_factory.provider()?;
303        let latest_block = provider.last_block_number()?;
304
305        // Get the actual pruning configuration
306        let prune_modes = provider.prune_modes_ref();
307
308        let checkpoints = provider.get_prune_checkpoints()?;
309        prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
310
311        // Unwind stages in reverse order of execution
312        let unwind_pipeline = self.stages.iter_mut().rev();
313
314        // Legacy Engine: This prevents a race condition in which the `StaticFileProducer` could
315        // attempt to proceed with a finalized block which has been unwinded
316        let _locked_sf_producer = self.static_file_producer.lock();
317
318        let mut provider_rw = self.provider_factory.database_provider_rw()?;
319
320        for stage in unwind_pipeline {
321            let stage_id = stage.id();
322            let span = info_span!("Unwinding", stage = %stage_id);
323            let _enter = span.enter();
324
325            let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
326            if checkpoint.block_number < to {
327                debug!(
328                    target: "sync::pipeline",
329                    from = %checkpoint.block_number,
330                    %to,
331                    "Unwind point too far for stage"
332                );
333                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
334
335                continue
336            }
337
338            info!(
339                target: "sync::pipeline",
340                from = %checkpoint.block_number,
341                %to,
342                ?bad_block,
343                "Starting unwind"
344            );
345            while checkpoint.block_number > to {
346                let unwind_started_at = Instant::now();
347                let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
348                self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
349
350                let output = stage.unwind(&provider_rw, input);
351                match output {
352                    Ok(unwind_output) => {
353                        checkpoint = unwind_output.checkpoint;
354                        info!(
355                            target: "sync::pipeline",
356                            stage = %stage_id,
357                            unwind_to = to,
358                            progress = checkpoint.block_number,
359                            done = checkpoint.block_number == to,
360                            "Stage unwound"
361                        );
362
363                        provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
364
365                        // Notify event listeners and update metrics.
366                        self.event_sender
367                            .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
368
369                        if let Some(metrics_tx) = &mut self.metrics_tx {
370                            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
371                                stage_id,
372                                checkpoint,
373                                // We assume it was set in the previous execute iteration, so it
374                                // doesn't change when we unwind.
375                                max_block_number: None,
376                                elapsed: unwind_started_at.elapsed(),
377                            });
378                        }
379
380                        // update finalized block if needed
381                        let last_saved_finalized_block_number =
382                            provider_rw.last_finalized_block_number()?;
383
384                        // If None, that means the finalized block is not written so we should
385                        // always save in that case
386                        if last_saved_finalized_block_number.is_none() ||
387                            Some(checkpoint.block_number) < last_saved_finalized_block_number
388                        {
389                            provider_rw.save_finalized_block_number(BlockNumber::from(
390                                checkpoint.block_number,
391                            ))?;
392                        }
393
394                        provider_rw.commit()?;
395
396                        stage.post_unwind_commit()?;
397
398                        provider_rw = self.provider_factory.database_provider_rw()?;
399                    }
400                    Err(err) => {
401                        self.event_sender.notify(PipelineEvent::Error { stage_id });
402
403                        return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
404                    }
405                }
406            }
407        }
408
409        Ok(())
410    }
411
412    async fn execute_stage_to_completion(
413        &mut self,
414        previous_stage: Option<BlockNumber>,
415        stage_index: usize,
416    ) -> Result<ControlFlow, PipelineError> {
417        let total_stages = self.stages.len();
418
419        let stage_id = self.stage(stage_index).id();
420        let mut made_progress = false;
421        let target = self.max_block.or(previous_stage);
422
423        loop {
424            let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
425
426            let stage_reached_max_block = prev_checkpoint
427                .zip(self.max_block)
428                .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
429            if stage_reached_max_block {
430                warn!(
431                    target: "sync::pipeline",
432                    stage = %stage_id,
433                    max_block = self.max_block,
434                    prev_block = prev_checkpoint.map(|progress| progress.block_number),
435                    "Stage reached target block, skipping."
436                );
437                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
438
439                // We reached the maximum block, so we skip the stage
440                return Ok(ControlFlow::NoProgress {
441                    block_number: prev_checkpoint.map(|progress| progress.block_number),
442                })
443            }
444
445            let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
446
447            self.event_sender.notify(PipelineEvent::Prepare {
448                pipeline_stages_progress: PipelineStagesProgress {
449                    current: stage_index + 1,
450                    total: total_stages,
451                },
452                stage_id,
453                checkpoint: prev_checkpoint,
454                target,
455            });
456
457            if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
458                self.event_sender.notify(PipelineEvent::Error { stage_id });
459                match self.on_stage_error(stage_id, prev_checkpoint, err)? {
460                    Some(ctrl) => return Ok(ctrl),
461                    None => continue,
462                };
463            }
464
465            let stage_started_at = Instant::now();
466            let provider_rw = self.provider_factory.database_provider_rw()?;
467
468            self.event_sender.notify(PipelineEvent::Run {
469                pipeline_stages_progress: PipelineStagesProgress {
470                    current: stage_index + 1,
471                    total: total_stages,
472                },
473                stage_id,
474                checkpoint: prev_checkpoint,
475                target,
476            });
477
478            match self.stage(stage_index).execute(&provider_rw, exec_input) {
479                Ok(out @ ExecOutput { checkpoint, done }) => {
480                    // Update stage checkpoint.
481                    provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
482
483                    // Commit processed data to the database.
484                    provider_rw.commit()?;
485
486                    // Invoke stage post commit hook.
487                    self.stage(stage_index).post_execute_commit()?;
488
489                    // Notify event listeners and update metrics.
490                    self.event_sender.notify(PipelineEvent::Ran {
491                        pipeline_stages_progress: PipelineStagesProgress {
492                            current: stage_index + 1,
493                            total: total_stages,
494                        },
495                        stage_id,
496                        result: out.clone(),
497                    });
498                    if let Some(metrics_tx) = &mut self.metrics_tx {
499                        let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
500                            stage_id,
501                            checkpoint,
502                            max_block_number: target,
503                            elapsed: stage_started_at.elapsed(),
504                        });
505                    }
506
507                    let block_number = checkpoint.block_number;
508                    let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
509                    made_progress |= block_number != prev_block_number;
510                    if done {
511                        return Ok(if made_progress {
512                            ControlFlow::Continue { block_number }
513                        } else {
514                            ControlFlow::NoProgress { block_number: Some(block_number) }
515                        })
516                    }
517                }
518                Err(err) => {
519                    drop(provider_rw);
520                    self.event_sender.notify(PipelineEvent::Error { stage_id });
521
522                    if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
523                        return Ok(ctrl)
524                    }
525                }
526            }
527        }
528    }
529
530    fn on_stage_error(
531        &mut self,
532        stage_id: StageId,
533        prev_checkpoint: Option<StageCheckpoint>,
534        err: StageError,
535    ) -> Result<Option<ControlFlow>, PipelineError> {
536        if let StageError::DetachedHead { local_head, header, error } = err {
537            warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
538
539            if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
540                if local_head.block.hash == last_detached_head_unwind_target &&
541                    header.block.number == local_head.block.number + 1
542                {
543                    self.detached_head_attempts += 1;
544                } else {
545                    self.detached_head_attempts = 1;
546                }
547            } else {
548                self.detached_head_attempts = 1;
549            }
550
551            // We unwind because of a detached head.
552            let unwind_to = local_head
553                .block
554                .number
555                .saturating_sub(
556                    BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
557                )
558                .max(1);
559
560            self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
561            Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
562        } else if let StageError::Block { block, error } = err {
563            match error {
564                BlockErrorKind::Validation(validation_error) => {
565                    error!(
566                        target: "sync::pipeline",
567                        stage = %stage_id,
568                        bad_block = %block.block.number,
569                        "Stage encountered a validation error: {validation_error}"
570                    );
571
572                    // FIXME: When handling errors, we do not commit the database transaction. This
573                    // leads to the Merkle stage not clearing its checkpoint, and restarting from an
574                    // invalid place.
575                    let provider_rw = self.provider_factory.database_provider_rw()?;
576                    provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
577                    provider_rw.save_stage_checkpoint(
578                        StageId::MerkleExecute,
579                        prev_checkpoint.unwrap_or_default(),
580                    )?;
581
582                    provider_rw.commit()?;
583
584                    // We unwind because of a validation error. If the unwind itself
585                    // fails, we bail entirely,
586                    // otherwise we restart the execution loop from the
587                    // beginning.
588                    Ok(Some(ControlFlow::Unwind {
589                        target: prev_checkpoint.unwrap_or_default().block_number,
590                        bad_block: block,
591                    }))
592                }
593                BlockErrorKind::Execution(execution_error) => {
594                    error!(
595                        target: "sync::pipeline",
596                        stage = %stage_id,
597                        bad_block = %block.block.number,
598                        "Stage encountered an execution error: {execution_error}"
599                    );
600
601                    // We unwind because of an execution error. If the unwind itself
602                    // fails, we bail entirely,
603                    // otherwise we restart
604                    // the execution loop from the beginning.
605                    Ok(Some(ControlFlow::Unwind {
606                        target: prev_checkpoint.unwrap_or_default().block_number,
607                        bad_block: block,
608                    }))
609                }
610            }
611        } else if let StageError::MissingStaticFileData { block, segment } = err {
612            error!(
613                target: "sync::pipeline",
614                stage = %stage_id,
615                bad_block = %block.block.number,
616                segment = %segment,
617                "Stage is missing static file data."
618            );
619
620            Ok(Some(ControlFlow::Unwind {
621                target: block.block.number.saturating_sub(1),
622                bad_block: block,
623            }))
624        } else if err.is_fatal() {
625            error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
626            Err(err.into())
627        } else {
628            // On other errors we assume they are recoverable if we discard the
629            // transaction and run the stage again.
630            warn!(
631                target: "sync::pipeline",
632                stage = %stage_id,
633                "Stage encountered a non-fatal error: {err}. Retrying..."
634            );
635            Ok(None)
636        }
637    }
638}
639
640impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
641    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642        f.debug_struct("Pipeline")
643            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
644            .field("max_block", &self.max_block)
645            .field("event_sender", &self.event_sender)
646            .field("fail_on_unwind", &self.fail_on_unwind)
647            .finish()
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use std::sync::atomic::Ordering;
654
655    use super::*;
656    use crate::{test_utils::TestStage, UnwindOutput};
657    use assert_matches::assert_matches;
658    use reth_consensus::ConsensusError;
659    use reth_errors::ProviderError;
660    use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
661    use reth_prune::PruneModes;
662    use reth_testing_utils::generators::{self, random_block_with_parent};
663    use tokio_stream::StreamExt;
664
665    #[test]
666    fn record_progress_calculates_outliers() {
667        let mut progress = PipelineProgress::default();
668
669        progress.update(10);
670        assert_eq!(progress.minimum_block_number, Some(10));
671        assert_eq!(progress.maximum_block_number, Some(10));
672
673        progress.update(20);
674        assert_eq!(progress.minimum_block_number, Some(10));
675        assert_eq!(progress.maximum_block_number, Some(20));
676
677        progress.update(1);
678        assert_eq!(progress.minimum_block_number, Some(1));
679        assert_eq!(progress.maximum_block_number, Some(20));
680    }
681
682    #[test]
683    fn progress_ctrl_flow() {
684        let mut progress = PipelineProgress::default();
685
686        assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
687
688        progress.update(1);
689        assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
690    }
691
692    /// Runs a simple pipeline.
693    #[tokio::test]
694    async fn run_pipeline() {
695        let provider_factory = create_test_provider_factory();
696
697        let stage_a = TestStage::new(StageId::Other("A"))
698            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
699        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
700        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
701
702        let stage_b = TestStage::new(StageId::Other("B"))
703            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
704        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
705        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
706
707        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
708            .add_stage(stage_a)
709            .add_stage(stage_b)
710            .with_max_block(10)
711            .build(
712                provider_factory.clone(),
713                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
714            );
715        let events = pipeline.events();
716
717        // Run pipeline
718        tokio::spawn(async move {
719            pipeline.run().await.unwrap();
720        });
721
722        // Check that the stages were run in order
723        assert_eq!(
724            events.collect::<Vec<PipelineEvent>>().await,
725            vec![
726                PipelineEvent::Prepare {
727                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
728                    stage_id: StageId::Other("A"),
729                    checkpoint: None,
730                    target: Some(10),
731                },
732                PipelineEvent::Run {
733                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
734                    stage_id: StageId::Other("A"),
735                    checkpoint: None,
736                    target: Some(10),
737                },
738                PipelineEvent::Ran {
739                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
740                    stage_id: StageId::Other("A"),
741                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
742                },
743                PipelineEvent::Prepare {
744                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
745                    stage_id: StageId::Other("B"),
746                    checkpoint: None,
747                    target: Some(10),
748                },
749                PipelineEvent::Run {
750                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
751                    stage_id: StageId::Other("B"),
752                    checkpoint: None,
753                    target: Some(10),
754                },
755                PipelineEvent::Ran {
756                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
757                    stage_id: StageId::Other("B"),
758                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
759                },
760            ]
761        );
762
763        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
764        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
765
766        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
767        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
768    }
769
770    /// Unwinds a simple pipeline.
771    #[tokio::test]
772    async fn unwind_pipeline() {
773        let provider_factory = create_test_provider_factory();
774
775        let stage_a = TestStage::new(StageId::Other("A"))
776            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
777            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
778        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
779        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
780
781        let stage_b = TestStage::new(StageId::Other("B"))
782            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
783            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
784        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
785        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
786
787        let stage_c = TestStage::new(StageId::Other("C"))
788            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
789            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
790        let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
791        let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
792
793        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
794            .add_stage(stage_a)
795            .add_stage(stage_b)
796            .add_stage(stage_c)
797            .with_max_block(10)
798            .build(
799                provider_factory.clone(),
800                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
801            );
802        let events = pipeline.events();
803
804        // Run pipeline
805        tokio::spawn(async move {
806            // Sync first
807            pipeline.run().await.expect("Could not run pipeline");
808
809            // Unwind
810            pipeline.unwind(1, None).expect("Could not unwind pipeline");
811        });
812
813        // Check that the stages were unwound in reverse order
814        assert_eq!(
815            events.collect::<Vec<PipelineEvent>>().await,
816            vec![
817                // Executing
818                PipelineEvent::Prepare {
819                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
820                    stage_id: StageId::Other("A"),
821                    checkpoint: None,
822                    target: Some(10),
823                },
824                PipelineEvent::Run {
825                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
826                    stage_id: StageId::Other("A"),
827                    checkpoint: None,
828                    target: Some(10),
829                },
830                PipelineEvent::Ran {
831                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
832                    stage_id: StageId::Other("A"),
833                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
834                },
835                PipelineEvent::Prepare {
836                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
837                    stage_id: StageId::Other("B"),
838                    checkpoint: None,
839                    target: Some(10),
840                },
841                PipelineEvent::Run {
842                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
843                    stage_id: StageId::Other("B"),
844                    checkpoint: None,
845                    target: Some(10),
846                },
847                PipelineEvent::Ran {
848                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
849                    stage_id: StageId::Other("B"),
850                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
851                },
852                PipelineEvent::Prepare {
853                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
854                    stage_id: StageId::Other("C"),
855                    checkpoint: None,
856                    target: Some(10),
857                },
858                PipelineEvent::Run {
859                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
860                    stage_id: StageId::Other("C"),
861                    checkpoint: None,
862                    target: Some(10),
863                },
864                PipelineEvent::Ran {
865                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
866                    stage_id: StageId::Other("C"),
867                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
868                },
869                // Unwinding
870                PipelineEvent::Unwind {
871                    stage_id: StageId::Other("C"),
872                    input: UnwindInput {
873                        checkpoint: StageCheckpoint::new(20),
874                        unwind_to: 1,
875                        bad_block: None
876                    }
877                },
878                PipelineEvent::Unwound {
879                    stage_id: StageId::Other("C"),
880                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
881                },
882                PipelineEvent::Unwind {
883                    stage_id: StageId::Other("B"),
884                    input: UnwindInput {
885                        checkpoint: StageCheckpoint::new(10),
886                        unwind_to: 1,
887                        bad_block: None
888                    }
889                },
890                PipelineEvent::Unwound {
891                    stage_id: StageId::Other("B"),
892                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
893                },
894                PipelineEvent::Unwind {
895                    stage_id: StageId::Other("A"),
896                    input: UnwindInput {
897                        checkpoint: StageCheckpoint::new(100),
898                        unwind_to: 1,
899                        bad_block: None
900                    }
901                },
902                PipelineEvent::Unwound {
903                    stage_id: StageId::Other("A"),
904                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
905                },
906            ]
907        );
908
909        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
910        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
911
912        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
913        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
914
915        assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
916        assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
917    }
918
919    /// Unwinds a pipeline with intermediate progress.
920    #[tokio::test]
921    async fn unwind_pipeline_with_intermediate_progress() {
922        let provider_factory = create_test_provider_factory();
923
924        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
925            .add_stage(
926                TestStage::new(StageId::Other("A"))
927                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
928                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
929            )
930            .add_stage(
931                TestStage::new(StageId::Other("B"))
932                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
933            )
934            .with_max_block(10)
935            .build(
936                provider_factory.clone(),
937                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
938            );
939        let events = pipeline.events();
940
941        // Run pipeline
942        tokio::spawn(async move {
943            // Sync first
944            pipeline.run().await.expect("Could not run pipeline");
945
946            // Unwind
947            pipeline.unwind(50, None).expect("Could not unwind pipeline");
948        });
949
950        // Check that the stages were unwound in reverse order
951        assert_eq!(
952            events.collect::<Vec<PipelineEvent>>().await,
953            vec![
954                // Executing
955                PipelineEvent::Prepare {
956                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
957                    stage_id: StageId::Other("A"),
958                    checkpoint: None,
959                    target: Some(10),
960                },
961                PipelineEvent::Run {
962                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
963                    stage_id: StageId::Other("A"),
964                    checkpoint: None,
965                    target: Some(10),
966                },
967                PipelineEvent::Ran {
968                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
969                    stage_id: StageId::Other("A"),
970                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
971                },
972                PipelineEvent::Prepare {
973                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
974                    stage_id: StageId::Other("B"),
975                    checkpoint: None,
976                    target: Some(10),
977                },
978                PipelineEvent::Run {
979                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
980                    stage_id: StageId::Other("B"),
981                    checkpoint: None,
982                    target: Some(10),
983                },
984                PipelineEvent::Ran {
985                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
986                    stage_id: StageId::Other("B"),
987                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
988                },
989                // Unwinding
990                // Nothing to unwind in stage "B"
991                PipelineEvent::Skipped { stage_id: StageId::Other("B") },
992                PipelineEvent::Unwind {
993                    stage_id: StageId::Other("A"),
994                    input: UnwindInput {
995                        checkpoint: StageCheckpoint::new(100),
996                        unwind_to: 50,
997                        bad_block: None
998                    }
999                },
1000                PipelineEvent::Unwound {
1001                    stage_id: StageId::Other("A"),
1002                    result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1003                },
1004            ]
1005        );
1006    }
1007
1008    /// Runs a pipeline that unwinds during sync.
1009    ///
1010    /// The flow is:
1011    ///
1012    /// - Stage A syncs to block 10
1013    /// - Stage B triggers an unwind, marking block 5 as bad
1014    /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is
1015    ///   skipped entirely (there is nothing to unwind)
1016    /// - Stage A unwinds to its previous progress, block 0
1017    /// - Stage A syncs back up to block 10
1018    /// - Stage B syncs to block 10
1019    /// - The pipeline finishes
1020    #[tokio::test]
1021    async fn run_pipeline_with_unwind() {
1022        let provider_factory = create_test_provider_factory();
1023
1024        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1025            .add_stage(
1026                TestStage::new(StageId::Other("A"))
1027                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1028                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1029                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1030            )
1031            .add_stage(
1032                TestStage::new(StageId::Other("B"))
1033                    .add_exec(Err(StageError::Block {
1034                        block: Box::new(random_block_with_parent(
1035                            &mut generators::rng(),
1036                            5,
1037                            Default::default(),
1038                        )),
1039                        error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1040                    }))
1041                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1042                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1043            )
1044            .with_max_block(10)
1045            .build(
1046                provider_factory.clone(),
1047                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1048            );
1049        let events = pipeline.events();
1050
1051        // Run pipeline
1052        tokio::spawn(async move {
1053            pipeline.run().await.expect("Could not run pipeline");
1054        });
1055
1056        // Check that the stages were unwound in reverse order
1057        assert_eq!(
1058            events.collect::<Vec<PipelineEvent>>().await,
1059            vec![
1060                PipelineEvent::Prepare {
1061                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1062                    stage_id: StageId::Other("A"),
1063                    checkpoint: None,
1064                    target: Some(10),
1065                },
1066                PipelineEvent::Run {
1067                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1068                    stage_id: StageId::Other("A"),
1069                    checkpoint: None,
1070                    target: Some(10),
1071                },
1072                PipelineEvent::Ran {
1073                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1074                    stage_id: StageId::Other("A"),
1075                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1076                },
1077                PipelineEvent::Prepare {
1078                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1079                    stage_id: StageId::Other("B"),
1080                    checkpoint: None,
1081                    target: Some(10),
1082                },
1083                PipelineEvent::Run {
1084                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1085                    stage_id: StageId::Other("B"),
1086                    checkpoint: None,
1087                    target: Some(10),
1088                },
1089                PipelineEvent::Error { stage_id: StageId::Other("B") },
1090                PipelineEvent::Unwind {
1091                    stage_id: StageId::Other("A"),
1092                    input: UnwindInput {
1093                        checkpoint: StageCheckpoint::new(10),
1094                        unwind_to: 0,
1095                        bad_block: Some(5)
1096                    }
1097                },
1098                PipelineEvent::Unwound {
1099                    stage_id: StageId::Other("A"),
1100                    result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1101                },
1102                PipelineEvent::Prepare {
1103                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1104                    stage_id: StageId::Other("A"),
1105                    checkpoint: Some(StageCheckpoint::new(0)),
1106                    target: Some(10),
1107                },
1108                PipelineEvent::Run {
1109                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1110                    stage_id: StageId::Other("A"),
1111                    checkpoint: Some(StageCheckpoint::new(0)),
1112                    target: Some(10),
1113                },
1114                PipelineEvent::Ran {
1115                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1116                    stage_id: StageId::Other("A"),
1117                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1118                },
1119                PipelineEvent::Prepare {
1120                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1121                    stage_id: StageId::Other("B"),
1122                    checkpoint: None,
1123                    target: Some(10),
1124                },
1125                PipelineEvent::Run {
1126                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1127                    stage_id: StageId::Other("B"),
1128                    checkpoint: None,
1129                    target: Some(10),
1130                },
1131                PipelineEvent::Ran {
1132                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1133                    stage_id: StageId::Other("B"),
1134                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1135                },
1136            ]
1137        );
1138    }
1139
1140    /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
1141    #[tokio::test]
1142    async fn pipeline_error_handling() {
1143        // Non-fatal
1144        let provider_factory = create_test_provider_factory();
1145        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1146            .add_stage(
1147                TestStage::new(StageId::Other("NonFatal"))
1148                    .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1149                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1150            )
1151            .with_max_block(10)
1152            .build(
1153                provider_factory.clone(),
1154                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1155            );
1156        let result = pipeline.run().await;
1157        assert_matches!(result, Ok(()));
1158
1159        // Fatal
1160        let provider_factory = create_test_provider_factory();
1161        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1162            .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1163                StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1164            )))
1165            .build(
1166                provider_factory.clone(),
1167                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1168            );
1169        let result = pipeline.run().await;
1170        assert_matches!(
1171            result,
1172            Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1173                ProviderError::BlockBodyIndicesNotFound(5)
1174            )))
1175        );
1176    }
1177}