Skip to main content

reth_stages_api/pipeline/
mod.rs

1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10    providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11    ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12    PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18    pin::Pin,
19    time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29    BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30    StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37/// A container for a queued stage.
38pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40/// The future that returns the owned pipeline and the result of the pipeline run. See
41/// [`Pipeline::run_as_fut`].
42pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44/// The pipeline type itself with the result of [`Pipeline::run_as_fut`]
45pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48/// A staged sync pipeline.
49///
50/// The pipeline executes queued [stages][Stage] serially. An external component determines the tip
51/// of the chain and the pipeline then executes each stage in order from the current local chain tip
52/// and the external chain tip. When a stage is executed, it will run until it reaches the chain
53/// tip.
54///
55/// After the entire pipeline has been run, it will run again unless asked to stop (see
56/// [`Pipeline::set_max_block`]).
57///
58/// `include_mmd!("docs/mermaid/pipeline.mmd`")
59///
60/// # Unwinding
61///
62/// In case of a validation error (as determined by the consensus engine) in one of the stages, the
63/// pipeline will unwind the stages in reverse order of execution. It is also possible to
64/// request an unwind manually (see [`Pipeline::unwind`]).
65///
66/// # Defaults
67///
68/// The [`DefaultStages`](crate::sets::DefaultStages) are used to fully sync reth.
69pub struct Pipeline<N: ProviderNodeTypes> {
70    /// Provider factory.
71    provider_factory: ProviderFactory<N>,
72    /// All configured stages in the order they will be executed.
73    stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74    /// The maximum block number to sync to.
75    max_block: Option<BlockNumber>,
76    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77    /// Sender for events the pipeline emits.
78    event_sender: EventSender<PipelineEvent>,
79    /// Keeps track of the progress of the pipeline.
80    progress: PipelineProgress,
81    /// A Sender for the current chain tip to sync to.
82    ///
83    /// This is used to notify the headers stage about a new sync target.
84    tip_tx: Option<watch::Sender<B256>>,
85    metrics_tx: Option<MetricEventsSender>,
86    /// Whether an unwind should fail the syncing process. Should only be set when downloading
87    /// blocks from trusted sources and expecting them to be valid.
88    fail_on_unwind: bool,
89    /// Block that was chosen as a target of the last unwind triggered by
90    /// [`StageError::DetachedHead`] error.
91    last_detached_head_unwind_target: Option<B256>,
92    /// Number of consecutive unwind attempts due to [`StageError::DetachedHead`] for the current
93    /// fork.
94    detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98    /// Construct a pipeline using a [`PipelineBuilder`].
99    pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100    {
101        PipelineBuilder::default()
102    }
103
104    /// Return the minimum block number achieved by
105    /// any stage during the execution of the pipeline.
106    pub const fn minimum_block_number(&self) -> Option<u64> {
107        self.progress.minimum_block_number
108    }
109
110    /// Set tip for reverse sync.
111    #[track_caller]
112    pub fn set_tip(&self, tip: B256) {
113        let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114            warn!(target: "sync::pipeline", "Chain tip channel closed");
115        });
116    }
117
118    /// Listen for events on the pipeline.
119    pub fn events(&self) -> EventStream<PipelineEvent> {
120        self.event_sender.new_listener()
121    }
122
123    /// Get a mutable reference to a stage by index.
124    pub fn stage(
125        &mut self,
126        idx: usize,
127    ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128        &mut self.stages[idx]
129    }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133    /// Registers progress metrics for each registered stage
134    pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135        let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136        let provider = self.provider_factory.provider()?;
137
138        for stage in &self.stages {
139            let stage_id = stage.id();
140            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141                stage_id,
142                checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143                max_block_number: None,
144                elapsed: Duration::default(),
145            });
146        }
147        Ok(())
148    }
149
150    /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the
151    /// pipeline and its result as a future.
152    #[track_caller]
153    pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154        let _ = self.register_metrics();
155        Box::pin(async move {
156            // NOTE: the tip should only be None if we are in continuous sync mode.
157            if let Some(target) = target {
158                match target {
159                    PipelineTarget::Sync(tip) => self.set_tip(tip),
160                    PipelineTarget::Unwind(target) => {
161                        if let Err(err) = self.move_to_static_files() {
162                            return (self, Err(err.into()))
163                        }
164                        if let Err(err) = self.unwind(target, None) {
165                            return (self, Err(err))
166                        }
167                        self.progress.update(target);
168
169                        return (self, Ok(ControlFlow::Continue { block_number: target }))
170                    }
171                }
172            }
173
174            let result = self.run_loop().await;
175            trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176            (self, result)
177        })
178    }
179
180    /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
181    /// a `max_block` in the pipeline.
182    pub async fn run(&mut self) -> Result<(), PipelineError> {
183        let _ = self.register_metrics(); // ignore error
184
185        loop {
186            let next_action = self.run_loop().await?;
187
188            if next_action.is_unwind() && self.fail_on_unwind {
189                return Err(PipelineError::UnexpectedUnwind)
190            }
191
192            // Terminate the loop early if it's reached the maximum user
193            // configured block.
194            if next_action.should_continue() &&
195                self.progress
196                    .minimum_block_number
197                    .zip(self.max_block)
198                    .is_some_and(|(progress, target)| progress >= target)
199            {
200                trace!(
201                    target: "sync::pipeline",
202                    ?next_action,
203                    minimum_block_number = ?self.progress.minimum_block_number,
204                    max_block = ?self.max_block,
205                    "Terminating pipeline."
206                );
207                return Ok(())
208            }
209        }
210    }
211
212    /// Performs one pass of the pipeline across all stages. After successful
213    /// execution of each stage, it proceeds to commit it to the database.
214    ///
215    /// If any stage is unsuccessful at execution, we proceed to
216    /// unwind. This will undo the progress across the entire pipeline
217    /// up to the block that caused the error.
218    ///
219    /// Returns the control flow after it ran the pipeline.
220    /// This will be [`ControlFlow::Continue`] or [`ControlFlow::NoProgress`] of the _last_ stage in
221    /// the pipeline (for example the `Finish` stage). Or [`ControlFlow::Unwind`] of the stage
222    /// that caused the unwind.
223    pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224        self.move_to_static_files()?;
225
226        let mut previous_stage = None;
227        for stage_index in 0..self.stages.len() {
228            let stage = &self.stages[stage_index];
229            let stage_id = stage.id();
230
231            trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232            let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234            trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236            match next {
237                ControlFlow::NoProgress { block_number } => {
238                    if let Some(block_number) = block_number {
239                        self.progress.update(block_number);
240                    }
241                }
242                ControlFlow::Continue { block_number } => self.progress.update(block_number),
243                ControlFlow::Unwind { target, bad_block } => {
244                    self.unwind(target, Some(bad_block.block.number))?;
245                    return Ok(ControlFlow::Unwind { target, bad_block })
246                }
247            }
248
249            previous_stage = Some(
250                self.provider_factory
251                    .provider()?
252                    .get_stage_checkpoint(stage_id)?
253                    .unwrap_or_default()
254                    .block_number,
255            );
256        }
257
258        Ok(self.progress.next_ctrl())
259    }
260
261    /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move**
262    /// all data from the database to static files for corresponding
263    /// [segments](reth_static_file_types::StaticFileSegment), according to their [stage
264    /// checkpoints](StageCheckpoint):
265    /// - [`StaticFileSegment::Headers`](reth_static_file_types::StaticFileSegment::Headers) ->
266    ///   [`StageId::Headers`]
267    /// - [`StaticFileSegment::Receipts`](reth_static_file_types::StaticFileSegment::Receipts) ->
268    ///   [`StageId::Execution`]
269    /// - [`StaticFileSegment::Transactions`](reth_static_file_types::StaticFileSegment::Transactions)
270    ///   -> [`StageId::Bodies`]
271    ///
272    /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
273    /// lock is occupied.
274    pub fn move_to_static_files(&self) -> RethResult<()> {
275        // Copies data from database to static files
276        let lowest_static_file_height =
277            self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279        // Deletes data which has been copied to static files.
280        if let Some(prune_tip) = lowest_static_file_height {
281            // Run the pruner so we don't potentially end up with higher height in the database vs
282            // static files during a pipeline unwind
283            let mut pruner = PrunerBuilder::new(Default::default())
284                .delete_limit(usize::MAX)
285                .build_with_provider_factory(self.provider_factory.clone());
286
287            pruner.run(prune_tip)?;
288        }
289
290        Ok(())
291    }
292
293    /// Unwind the stages to the target block (exclusive).
294    ///
295    /// If the unwind is due to a bad block the number of that block should be specified.
296    pub fn unwind(
297        &mut self,
298        to: BlockNumber,
299        bad_block: Option<BlockNumber>,
300    ) -> Result<(), PipelineError> {
301        // Add validation before starting unwind
302        let (latest_block, prune_modes, checkpoints) = {
303            let provider = self.provider_factory.provider()?;
304            (
305                provider.last_block_number()?,
306                provider.prune_modes_ref().clone(),
307                provider.get_prune_checkpoints()?,
308            )
309        };
310        prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
311
312        // Unwind stages in reverse order of execution
313        let unwind_pipeline = self.stages.iter_mut().rev();
314
315        // Legacy Engine: This prevents a race condition in which the `StaticFileProducer` could
316        // attempt to proceed with a finalized block which has been unwinded
317        let _locked_sf_producer = self.static_file_producer.lock();
318
319        let mut provider_rw =
320            self.provider_factory.unwind_provider_rw()?.disable_long_read_transaction_safety();
321
322        for stage in unwind_pipeline {
323            let stage_id = stage.id();
324            let span = info_span!("Unwinding", stage = %stage_id);
325            let _enter = span.enter();
326
327            let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
328            if checkpoint.block_number < to {
329                debug!(
330                    target: "sync::pipeline",
331                    from = %checkpoint.block_number,
332                    %to,
333                    "Unwind point too far for stage"
334                );
335                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
336
337                continue
338            }
339
340            info!(
341                target: "sync::pipeline",
342                from = %checkpoint.block_number,
343                %to,
344                ?bad_block,
345                "Starting unwind"
346            );
347            while checkpoint.block_number > to {
348                let unwind_started_at = Instant::now();
349                let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
350                self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
351
352                let output = stage.unwind(&provider_rw, input);
353                match output {
354                    Ok(unwind_output) => {
355                        checkpoint = unwind_output.checkpoint;
356                        info!(
357                            target: "sync::pipeline",
358                            stage = %stage_id,
359                            unwind_to = to,
360                            progress = checkpoint.block_number,
361                            done = checkpoint.block_number == to,
362                            "Stage unwound"
363                        );
364
365                        provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
366
367                        // Notify event listeners and update metrics.
368                        self.event_sender
369                            .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
370
371                        if let Some(metrics_tx) = &mut self.metrics_tx {
372                            let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
373                                stage_id,
374                                checkpoint,
375                                // We assume it was set in the previous execute iteration, so it
376                                // doesn't change when we unwind.
377                                max_block_number: None,
378                                elapsed: unwind_started_at.elapsed(),
379                            });
380                        }
381
382                        // update finalized and safe block if needed
383                        let last_saved_finalized_block_number =
384                            provider_rw.last_finalized_block_number()?;
385
386                        // If None, that means the finalized block is not written so we should
387                        // always save in that case
388                        if last_saved_finalized_block_number.is_none() ||
389                            Some(checkpoint.block_number) < last_saved_finalized_block_number
390                        {
391                            provider_rw.save_finalized_block_number(BlockNumber::from(
392                                checkpoint.block_number,
393                            ))?;
394                        }
395
396                        let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
397
398                        if last_saved_safe_block_number.is_none() ||
399                            Some(checkpoint.block_number) < last_saved_safe_block_number
400                        {
401                            provider_rw.save_safe_block_number(BlockNumber::from(
402                                checkpoint.block_number,
403                            ))?;
404                        }
405
406                        provider_rw.commit()?;
407
408                        stage.post_unwind_commit()?;
409
410                        provider_rw = self.provider_factory.unwind_provider_rw()?;
411                    }
412                    Err(err) => {
413                        self.event_sender.notify(PipelineEvent::Error { stage_id });
414
415                        return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
416                    }
417                }
418            }
419        }
420
421        Ok(())
422    }
423
424    async fn execute_stage_to_completion(
425        &mut self,
426        previous_stage: Option<BlockNumber>,
427        stage_index: usize,
428    ) -> Result<ControlFlow, PipelineError> {
429        let total_stages = self.stages.len();
430
431        let stage_id = self.stage(stage_index).id();
432        let mut made_progress = false;
433        let target = self.max_block.or(previous_stage);
434
435        loop {
436            let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
437
438            let stage_reached_max_block = prev_checkpoint
439                .zip(self.max_block)
440                .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
441            if stage_reached_max_block {
442                warn!(
443                    target: "sync::pipeline",
444                    stage = %stage_id,
445                    max_block = self.max_block,
446                    prev_block = prev_checkpoint.map(|progress| progress.block_number),
447                    "Stage reached target block, skipping."
448                );
449                self.event_sender.notify(PipelineEvent::Skipped { stage_id });
450
451                // We reached the maximum block, so we skip the stage
452                return Ok(ControlFlow::NoProgress {
453                    block_number: prev_checkpoint.map(|progress| progress.block_number),
454                })
455            }
456
457            let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
458
459            self.event_sender.notify(PipelineEvent::Prepare {
460                pipeline_stages_progress: PipelineStagesProgress {
461                    current: stage_index + 1,
462                    total: total_stages,
463                },
464                stage_id,
465                checkpoint: prev_checkpoint,
466                target,
467            });
468
469            if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
470                self.event_sender.notify(PipelineEvent::Error { stage_id });
471                match self.on_stage_error(stage_id, prev_checkpoint, err)? {
472                    Some(ctrl) => return Ok(ctrl),
473                    None => continue,
474                };
475            }
476
477            let stage_started_at = Instant::now();
478            let provider_rw = self.provider_factory.database_provider_rw()?;
479
480            self.event_sender.notify(PipelineEvent::Run {
481                pipeline_stages_progress: PipelineStagesProgress {
482                    current: stage_index + 1,
483                    total: total_stages,
484                },
485                stage_id,
486                checkpoint: prev_checkpoint,
487                target,
488            });
489
490            match self.stage(stage_index).execute(&provider_rw, exec_input) {
491                Ok(out @ ExecOutput { checkpoint, done }) => {
492                    // Update stage checkpoint.
493                    provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
494
495                    // Commit processed data to the database.
496                    provider_rw.commit()?;
497
498                    // Invoke stage post commit hook.
499                    self.stage(stage_index).post_execute_commit()?;
500
501                    // Notify event listeners and update metrics.
502                    self.event_sender.notify(PipelineEvent::Ran {
503                        pipeline_stages_progress: PipelineStagesProgress {
504                            current: stage_index + 1,
505                            total: total_stages,
506                        },
507                        stage_id,
508                        result: out.clone(),
509                    });
510                    if let Some(metrics_tx) = &mut self.metrics_tx {
511                        let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
512                            stage_id,
513                            checkpoint,
514                            max_block_number: target,
515                            elapsed: stage_started_at.elapsed(),
516                        });
517                    }
518
519                    let block_number = checkpoint.block_number;
520                    let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
521                    made_progress |= block_number != prev_block_number;
522                    if done {
523                        return Ok(if made_progress {
524                            ControlFlow::Continue { block_number }
525                        } else {
526                            ControlFlow::NoProgress { block_number: Some(block_number) }
527                        })
528                    }
529                }
530                Err(err) => {
531                    drop(provider_rw);
532                    self.event_sender.notify(PipelineEvent::Error { stage_id });
533
534                    if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
535                        return Ok(ctrl)
536                    }
537                }
538            }
539        }
540    }
541
542    fn on_stage_error(
543        &mut self,
544        stage_id: StageId,
545        prev_checkpoint: Option<StageCheckpoint>,
546        err: StageError,
547    ) -> Result<Option<ControlFlow>, PipelineError> {
548        if let StageError::DetachedHead { local_head, header, error } = err {
549            warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
550
551            if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
552                if local_head.block.hash == last_detached_head_unwind_target &&
553                    header.block.number == local_head.block.number + 1
554                {
555                    self.detached_head_attempts += 1;
556                } else {
557                    self.detached_head_attempts = 1;
558                }
559            } else {
560                self.detached_head_attempts = 1;
561            }
562
563            // We unwind because of a detached head.
564            let unwind_to = local_head
565                .block
566                .number
567                .saturating_sub(
568                    BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
569                )
570                .max(1);
571
572            self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
573            Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
574        } else if let StageError::Block { block, error } = err {
575            match error {
576                BlockErrorKind::Validation(validation_error) => {
577                    error!(
578                        target: "sync::pipeline",
579                        stage = %stage_id,
580                        bad_block = %block.block.number,
581                        "Stage encountered a validation error: {validation_error}"
582                    );
583
584                    // FIXME: When handling errors, we do not commit the database transaction. This
585                    // leads to the Merkle stage not clearing its checkpoint, and restarting from an
586                    // invalid place.
587                    // Only reset MerkleExecute checkpoint if MerkleExecute itself failed
588                    if stage_id == StageId::MerkleExecute {
589                        let provider_rw = self.provider_factory.database_provider_rw()?;
590                        provider_rw
591                            .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
592                        provider_rw.save_stage_checkpoint(
593                            StageId::MerkleExecute,
594                            prev_checkpoint.unwrap_or_default(),
595                        )?;
596
597                        provider_rw.commit()?;
598                    }
599
600                    // We unwind because of a validation error. If the unwind itself
601                    // fails, we bail entirely,
602                    // otherwise we restart the execution loop from the
603                    // beginning.
604                    Ok(Some(ControlFlow::Unwind {
605                        target: prev_checkpoint.unwrap_or_default().block_number,
606                        bad_block: block,
607                    }))
608                }
609                BlockErrorKind::Execution(execution_error) => {
610                    error!(
611                        target: "sync::pipeline",
612                        stage = %stage_id,
613                        bad_block = %block.block.number,
614                        "Stage encountered an execution error: {execution_error}"
615                    );
616
617                    // We unwind because of an execution error. If the unwind itself
618                    // fails, we bail entirely,
619                    // otherwise we restart
620                    // the execution loop from the beginning.
621                    Ok(Some(ControlFlow::Unwind {
622                        target: prev_checkpoint.unwrap_or_default().block_number,
623                        bad_block: block,
624                    }))
625                }
626            }
627        } else if let StageError::MissingStaticFileData { block, segment } = err {
628            error!(
629                target: "sync::pipeline",
630                stage = %stage_id,
631                bad_block = %block.block.number,
632                segment = %segment,
633                "Stage is missing static file data."
634            );
635
636            Ok(Some(ControlFlow::Unwind {
637                target: block.block.number.saturating_sub(1),
638                bad_block: block,
639            }))
640        } else if err.is_fatal() {
641            error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
642            Err(err.into())
643        } else {
644            // On other errors we assume they are recoverable if we discard the
645            // transaction and run the stage again.
646            warn!(
647                target: "sync::pipeline",
648                stage = %stage_id,
649                "Stage encountered a non-fatal error: {err}. Retrying..."
650            );
651            Ok(None)
652        }
653    }
654}
655
656impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
657    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
658        f.debug_struct("Pipeline")
659            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
660            .field("max_block", &self.max_block)
661            .field("event_sender", &self.event_sender)
662            .field("fail_on_unwind", &self.fail_on_unwind)
663            .finish()
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use std::sync::atomic::Ordering;
670
671    use super::*;
672    use crate::{test_utils::TestStage, UnwindOutput};
673    use assert_matches::assert_matches;
674    use reth_consensus::ConsensusError;
675    use reth_errors::ProviderError;
676    use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
677    use reth_prune::PruneModes;
678    use reth_testing_utils::generators::{self, random_block_with_parent};
679    use tokio_stream::StreamExt;
680
681    #[test]
682    fn record_progress_calculates_outliers() {
683        let mut progress = PipelineProgress::default();
684
685        progress.update(10);
686        assert_eq!(progress.minimum_block_number, Some(10));
687        assert_eq!(progress.maximum_block_number, Some(10));
688
689        progress.update(20);
690        assert_eq!(progress.minimum_block_number, Some(10));
691        assert_eq!(progress.maximum_block_number, Some(20));
692
693        progress.update(1);
694        assert_eq!(progress.minimum_block_number, Some(1));
695        assert_eq!(progress.maximum_block_number, Some(20));
696    }
697
698    #[test]
699    fn progress_ctrl_flow() {
700        let mut progress = PipelineProgress::default();
701
702        assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
703
704        progress.update(1);
705        assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
706    }
707
708    /// Runs a simple pipeline.
709    #[tokio::test]
710    async fn run_pipeline() {
711        let provider_factory = create_test_provider_factory();
712
713        let stage_a = TestStage::new(StageId::Other("A"))
714            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
715        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
716        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
717
718        let stage_b = TestStage::new(StageId::Other("B"))
719            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
720        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
721        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
722
723        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
724            .add_stage(stage_a)
725            .add_stage(stage_b)
726            .with_max_block(10)
727            .build(
728                provider_factory.clone(),
729                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
730            );
731        let events = pipeline.events();
732
733        // Run pipeline
734        tokio::spawn(async move {
735            pipeline.run().await.unwrap();
736        });
737
738        // Check that the stages were run in order
739        assert_eq!(
740            events.collect::<Vec<PipelineEvent>>().await,
741            vec![
742                PipelineEvent::Prepare {
743                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
744                    stage_id: StageId::Other("A"),
745                    checkpoint: None,
746                    target: Some(10),
747                },
748                PipelineEvent::Run {
749                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
750                    stage_id: StageId::Other("A"),
751                    checkpoint: None,
752                    target: Some(10),
753                },
754                PipelineEvent::Ran {
755                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
756                    stage_id: StageId::Other("A"),
757                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
758                },
759                PipelineEvent::Prepare {
760                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
761                    stage_id: StageId::Other("B"),
762                    checkpoint: None,
763                    target: Some(10),
764                },
765                PipelineEvent::Run {
766                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
767                    stage_id: StageId::Other("B"),
768                    checkpoint: None,
769                    target: Some(10),
770                },
771                PipelineEvent::Ran {
772                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
773                    stage_id: StageId::Other("B"),
774                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
775                },
776            ]
777        );
778
779        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
780        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
781
782        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
783        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
784    }
785
786    /// Unwinds a simple pipeline.
787    #[tokio::test]
788    async fn unwind_pipeline() {
789        let provider_factory = create_test_provider_factory();
790
791        let stage_a = TestStage::new(StageId::Other("A"))
792            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
793            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
794        let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
795        let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
796
797        let stage_b = TestStage::new(StageId::Other("B"))
798            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
799            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
800        let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
801        let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
802
803        let stage_c = TestStage::new(StageId::Other("C"))
804            .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
805            .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
806        let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
807        let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
808
809        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
810            .add_stage(stage_a)
811            .add_stage(stage_b)
812            .add_stage(stage_c)
813            .with_max_block(10)
814            .build(
815                provider_factory.clone(),
816                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
817            );
818        let events = pipeline.events();
819
820        // Run pipeline
821        tokio::spawn(async move {
822            // Sync first
823            pipeline.run().await.expect("Could not run pipeline");
824
825            // Unwind
826            pipeline.unwind(1, None).expect("Could not unwind pipeline");
827        });
828
829        // Check that the stages were unwound in reverse order
830        assert_eq!(
831            events.collect::<Vec<PipelineEvent>>().await,
832            vec![
833                // Executing
834                PipelineEvent::Prepare {
835                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
836                    stage_id: StageId::Other("A"),
837                    checkpoint: None,
838                    target: Some(10),
839                },
840                PipelineEvent::Run {
841                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
842                    stage_id: StageId::Other("A"),
843                    checkpoint: None,
844                    target: Some(10),
845                },
846                PipelineEvent::Ran {
847                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
848                    stage_id: StageId::Other("A"),
849                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
850                },
851                PipelineEvent::Prepare {
852                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
853                    stage_id: StageId::Other("B"),
854                    checkpoint: None,
855                    target: Some(10),
856                },
857                PipelineEvent::Run {
858                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
859                    stage_id: StageId::Other("B"),
860                    checkpoint: None,
861                    target: Some(10),
862                },
863                PipelineEvent::Ran {
864                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
865                    stage_id: StageId::Other("B"),
866                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
867                },
868                PipelineEvent::Prepare {
869                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
870                    stage_id: StageId::Other("C"),
871                    checkpoint: None,
872                    target: Some(10),
873                },
874                PipelineEvent::Run {
875                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
876                    stage_id: StageId::Other("C"),
877                    checkpoint: None,
878                    target: Some(10),
879                },
880                PipelineEvent::Ran {
881                    pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
882                    stage_id: StageId::Other("C"),
883                    result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
884                },
885                // Unwinding
886                PipelineEvent::Unwind {
887                    stage_id: StageId::Other("C"),
888                    input: UnwindInput {
889                        checkpoint: StageCheckpoint::new(20),
890                        unwind_to: 1,
891                        bad_block: None
892                    }
893                },
894                PipelineEvent::Unwound {
895                    stage_id: StageId::Other("C"),
896                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
897                },
898                PipelineEvent::Unwind {
899                    stage_id: StageId::Other("B"),
900                    input: UnwindInput {
901                        checkpoint: StageCheckpoint::new(10),
902                        unwind_to: 1,
903                        bad_block: None
904                    }
905                },
906                PipelineEvent::Unwound {
907                    stage_id: StageId::Other("B"),
908                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
909                },
910                PipelineEvent::Unwind {
911                    stage_id: StageId::Other("A"),
912                    input: UnwindInput {
913                        checkpoint: StageCheckpoint::new(100),
914                        unwind_to: 1,
915                        bad_block: None
916                    }
917                },
918                PipelineEvent::Unwound {
919                    stage_id: StageId::Other("A"),
920                    result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
921                },
922            ]
923        );
924
925        assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
926        assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
927
928        assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
929        assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
930
931        assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
932        assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
933    }
934
935    /// Unwinds a pipeline with intermediate progress.
936    #[tokio::test]
937    async fn unwind_pipeline_with_intermediate_progress() {
938        let provider_factory = create_test_provider_factory();
939
940        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
941            .add_stage(
942                TestStage::new(StageId::Other("A"))
943                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
944                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
945            )
946            .add_stage(
947                TestStage::new(StageId::Other("B"))
948                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
949            )
950            .with_max_block(10)
951            .build(
952                provider_factory.clone(),
953                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
954            );
955        let events = pipeline.events();
956
957        // Run pipeline
958        tokio::spawn(async move {
959            // Sync first
960            pipeline.run().await.expect("Could not run pipeline");
961
962            // Unwind
963            pipeline.unwind(50, None).expect("Could not unwind pipeline");
964        });
965
966        // Check that the stages were unwound in reverse order
967        assert_eq!(
968            events.collect::<Vec<PipelineEvent>>().await,
969            vec![
970                // Executing
971                PipelineEvent::Prepare {
972                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
973                    stage_id: StageId::Other("A"),
974                    checkpoint: None,
975                    target: Some(10),
976                },
977                PipelineEvent::Run {
978                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
979                    stage_id: StageId::Other("A"),
980                    checkpoint: None,
981                    target: Some(10),
982                },
983                PipelineEvent::Ran {
984                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
985                    stage_id: StageId::Other("A"),
986                    result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
987                },
988                PipelineEvent::Prepare {
989                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
990                    stage_id: StageId::Other("B"),
991                    checkpoint: None,
992                    target: Some(10),
993                },
994                PipelineEvent::Run {
995                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
996                    stage_id: StageId::Other("B"),
997                    checkpoint: None,
998                    target: Some(10),
999                },
1000                PipelineEvent::Ran {
1001                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1002                    stage_id: StageId::Other("B"),
1003                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1004                },
1005                // Unwinding
1006                // Nothing to unwind in stage "B"
1007                PipelineEvent::Skipped { stage_id: StageId::Other("B") },
1008                PipelineEvent::Unwind {
1009                    stage_id: StageId::Other("A"),
1010                    input: UnwindInput {
1011                        checkpoint: StageCheckpoint::new(100),
1012                        unwind_to: 50,
1013                        bad_block: None
1014                    }
1015                },
1016                PipelineEvent::Unwound {
1017                    stage_id: StageId::Other("A"),
1018                    result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1019                },
1020            ]
1021        );
1022    }
1023
1024    /// Runs a pipeline that unwinds during sync.
1025    ///
1026    /// The flow is:
1027    ///
1028    /// - Stage A syncs to block 10
1029    /// - Stage B triggers an unwind, marking block 5 as bad
1030    /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is
1031    ///   skipped entirely (there is nothing to unwind)
1032    /// - Stage A unwinds to its previous progress, block 0
1033    /// - Stage A syncs back up to block 10
1034    /// - Stage B syncs to block 10
1035    /// - The pipeline finishes
1036    #[tokio::test]
1037    async fn run_pipeline_with_unwind() {
1038        let provider_factory = create_test_provider_factory();
1039
1040        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1041            .add_stage(
1042                TestStage::new(StageId::Other("A"))
1043                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1044                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1045                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1046            )
1047            .add_stage(
1048                TestStage::new(StageId::Other("B"))
1049                    .add_exec(Err(StageError::Block {
1050                        block: Box::new(random_block_with_parent(
1051                            &mut generators::rng(),
1052                            5,
1053                            Default::default(),
1054                        )),
1055                        error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1056                    }))
1057                    .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1058                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1059            )
1060            .with_max_block(10)
1061            .build(
1062                provider_factory.clone(),
1063                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1064            );
1065        let events = pipeline.events();
1066
1067        // Run pipeline
1068        tokio::spawn(async move {
1069            pipeline.run().await.expect("Could not run pipeline");
1070        });
1071
1072        // Check that the stages were unwound in reverse order
1073        assert_eq!(
1074            events.collect::<Vec<PipelineEvent>>().await,
1075            vec![
1076                PipelineEvent::Prepare {
1077                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1078                    stage_id: StageId::Other("A"),
1079                    checkpoint: None,
1080                    target: Some(10),
1081                },
1082                PipelineEvent::Run {
1083                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1084                    stage_id: StageId::Other("A"),
1085                    checkpoint: None,
1086                    target: Some(10),
1087                },
1088                PipelineEvent::Ran {
1089                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1090                    stage_id: StageId::Other("A"),
1091                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1092                },
1093                PipelineEvent::Prepare {
1094                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1095                    stage_id: StageId::Other("B"),
1096                    checkpoint: None,
1097                    target: Some(10),
1098                },
1099                PipelineEvent::Run {
1100                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1101                    stage_id: StageId::Other("B"),
1102                    checkpoint: None,
1103                    target: Some(10),
1104                },
1105                PipelineEvent::Error { stage_id: StageId::Other("B") },
1106                PipelineEvent::Unwind {
1107                    stage_id: StageId::Other("A"),
1108                    input: UnwindInput {
1109                        checkpoint: StageCheckpoint::new(10),
1110                        unwind_to: 0,
1111                        bad_block: Some(5)
1112                    }
1113                },
1114                PipelineEvent::Unwound {
1115                    stage_id: StageId::Other("A"),
1116                    result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1117                },
1118                PipelineEvent::Prepare {
1119                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1120                    stage_id: StageId::Other("A"),
1121                    checkpoint: Some(StageCheckpoint::new(0)),
1122                    target: Some(10),
1123                },
1124                PipelineEvent::Run {
1125                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1126                    stage_id: StageId::Other("A"),
1127                    checkpoint: Some(StageCheckpoint::new(0)),
1128                    target: Some(10),
1129                },
1130                PipelineEvent::Ran {
1131                    pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1132                    stage_id: StageId::Other("A"),
1133                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1134                },
1135                PipelineEvent::Prepare {
1136                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1137                    stage_id: StageId::Other("B"),
1138                    checkpoint: None,
1139                    target: Some(10),
1140                },
1141                PipelineEvent::Run {
1142                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1143                    stage_id: StageId::Other("B"),
1144                    checkpoint: None,
1145                    target: Some(10),
1146                },
1147                PipelineEvent::Ran {
1148                    pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1149                    stage_id: StageId::Other("B"),
1150                    result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1151                },
1152            ]
1153        );
1154    }
1155
1156    /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
1157    #[tokio::test]
1158    async fn pipeline_error_handling() {
1159        // Non-fatal
1160        let provider_factory = create_test_provider_factory();
1161        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1162            .add_stage(
1163                TestStage::new(StageId::Other("NonFatal"))
1164                    .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1165                    .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1166            )
1167            .with_max_block(10)
1168            .build(
1169                provider_factory.clone(),
1170                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1171            );
1172        let result = pipeline.run().await;
1173        assert_matches!(result, Ok(()));
1174
1175        // Fatal
1176        let provider_factory = create_test_provider_factory();
1177        let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1178            .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1179                StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1180            )))
1181            .build(
1182                provider_factory.clone(),
1183                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1184            );
1185        let result = pipeline.run().await;
1186        assert_matches!(
1187            result,
1188            Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1189                ProviderError::BlockBodyIndicesNotFound(5)
1190            )))
1191        );
1192    }
1193}