reth_stages_api/pipeline/
builder.rs

1use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet};
2use alloy_primitives::{BlockNumber, B256};
3use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
4use reth_static_file::StaticFileProducer;
5use tokio::sync::watch;
6
7/// Builds a [`Pipeline`].
8#[must_use = "call `build` to construct the pipeline"]
9pub struct PipelineBuilder<Provider> {
10    /// All configured stages in the order they will be executed.
11    stages: Vec<BoxedStage<Provider>>,
12    /// The maximum block number to sync to.
13    max_block: Option<BlockNumber>,
14    /// A receiver for the current chain tip to sync to.
15    tip_tx: Option<watch::Sender<B256>>,
16    metrics_tx: Option<MetricEventsSender>,
17    fail_on_unwind: bool,
18}
19
20impl<Provider> PipelineBuilder<Provider> {
21    /// Add a stage to the pipeline.
22    pub fn add_stage<S>(mut self, stage: S) -> Self
23    where
24        S: Stage<Provider> + 'static,
25    {
26        self.stages.push(Box::new(stage));
27        self
28    }
29
30    /// Add a set of stages to the pipeline.
31    ///
32    /// Stages can be grouped into a set by using a [`StageSet`].
33    ///
34    /// To customize the stages in the set (reorder, disable, insert a stage) call
35    /// [`builder`][StageSet::builder] on the set which will convert it to a
36    /// [`StageSetBuilder`][crate::StageSetBuilder].
37    pub fn add_stages<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
38        let states = set.builder().build();
39        self.stages.reserve_exact(states.len());
40        for stage in states {
41            self.stages.push(stage);
42        }
43        self
44    }
45
46    /// Set the target block.
47    ///
48    /// Once this block is reached, the pipeline will stop.
49    pub const fn with_max_block(mut self, block: BlockNumber) -> Self {
50        self.max_block = Some(block);
51        self
52    }
53
54    /// Set the tip sender.
55    pub fn with_tip_sender(mut self, tip_tx: watch::Sender<B256>) -> Self {
56        self.tip_tx = Some(tip_tx);
57        self
58    }
59
60    /// Set the metric events sender.
61    pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
62        self.metrics_tx = Some(metrics_tx);
63        self
64    }
65
66    /// Set whether pipeline should fail on unwind.
67    pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
68        self.fail_on_unwind = yes;
69        self
70    }
71
72    /// Builds the final [`Pipeline`] using the given database.
73    pub fn build<N>(
74        self,
75        provider_factory: ProviderFactory<N>,
76        static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77    ) -> Pipeline<N>
78    where
79        N: ProviderNodeTypes,
80        ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
81    {
82        let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
83        Pipeline {
84            provider_factory,
85            stages,
86            max_block,
87            static_file_producer,
88            tip_tx,
89            event_sender: Default::default(),
90            progress: Default::default(),
91            metrics_tx,
92            fail_on_unwind,
93        }
94    }
95}
96
97impl<Provider> Default for PipelineBuilder<Provider> {
98    fn default() -> Self {
99        Self {
100            stages: Vec::new(),
101            max_block: None,
102            tip_tx: None,
103            metrics_tx: None,
104            fail_on_unwind: false,
105        }
106    }
107}
108
109impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
110    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111        f.debug_struct("PipelineBuilder")
112            .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
113            .field("max_block", &self.max_block)
114            .field("fail_on_unwind", &self.fail_on_unwind)
115            .finish()
116    }
117}