reth_stages_api/pipeline/
builder.rs

1use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet};
2use alloy_primitives::{BlockNumber, B256};
3use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
4use reth_static_file::StaticFileProducer;
5use tokio::sync::watch;
6
7/// Builds a [`Pipeline`].
8#[must_use = "call `build` to construct the pipeline"]
9pub struct PipelineBuilder<Provider> {
10    /// All configured stages in the order they will be executed.
11    stages: Vec<BoxedStage<Provider>>,
12    /// The maximum block number to sync to.
13    max_block: Option<BlockNumber>,
14    /// A Sender for the current chain tip to sync to.
15    tip_tx: Option<watch::Sender<B256>>,
16    metrics_tx: Option<MetricEventsSender>,
17    fail_on_unwind: bool,
18}
19
20impl<Provider> PipelineBuilder<Provider> {
21    /// Add a stage to the pipeline.
22    pub fn add_stage<S>(mut self, stage: S) -> Self
23    where
24        S: Stage<Provider> + 'static,
25    {
26        self.stages.push(Box::new(stage));
27        self
28    }
29
30    /// Add a set of stages to the pipeline.
31    ///
32    /// Stages can be grouped into a set by using a [`StageSet`].
33    ///
34    /// To customize the stages in the set (reorder, disable, insert a stage) call
35    /// [`builder`][StageSet::builder] on the set which will convert it to a
36    /// [`StageSetBuilder`][crate::StageSetBuilder].
37    pub fn add_stages<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
38        let stages = set.builder().build();
39        self.stages.reserve(stages.len());
40        self.stages.extend(stages);
41        self
42    }
43
44    /// Set the target block.
45    ///
46    /// Once this block is reached, the pipeline will stop.
47    pub const fn with_max_block(mut self, block: BlockNumber) -> Self {
48        self.max_block = Some(block);
49        self
50    }
51
52    /// Set the tip sender.
53    pub fn with_tip_sender(mut self, tip_tx: watch::Sender<B256>) -> Self {
54        self.tip_tx = Some(tip_tx);
55        self
56    }
57
58    /// Set the metric events sender.
59    pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
60        self.metrics_tx = Some(metrics_tx);
61        self
62    }
63
64    /// Set whether pipeline should fail on unwind.
65    pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
66        self.fail_on_unwind = yes;
67        self
68    }
69
70    /// Builds the final [`Pipeline`] using the given database.
71    pub fn build<N>(
72        self,
73        provider_factory: ProviderFactory<N>,
74        static_file_producer: StaticFileProducer<ProviderFactory<N>>,
75    ) -> Pipeline<N>
76    where
77        N: ProviderNodeTypes,
78        ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
79    {
80        let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
81        Pipeline {
82            provider_factory,
83            stages,
84            max_block,
85            static_file_producer,
86            tip_tx,
87            event_sender: Default::default(),
88            progress: Default::default(),
89            metrics_tx,
90            fail_on_unwind,
91            last_detached_head_unwind_target: None,
92            detached_head_attempts: 0,
93        }
94    }
95}
96
97impl<Provider> Default for PipelineBuilder<Provider> {
98    fn default() -> Self {
99        Self {
100            stages: Vec::new(),
101            max_block: None,
102            tip_tx: None,
103            metrics_tx: None,
104            fail_on_unwind: false,
105        }
106    }
107}
108
109impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("PipelineBuilder")
112            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
113            .field("max_block", &self.max_block)
114            .field("fail_on_unwind", &self.fail_on_unwind)
115            .finish()
116    }
117}