reth_stages_api/pipeline/
builder.rs
1use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet};
2use alloy_primitives::{BlockNumber, B256};
3use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
4use reth_static_file::StaticFileProducer;
5use tokio::sync::watch;
6
7#[must_use = "call `build` to construct the pipeline"]
9pub struct PipelineBuilder<Provider> {
10 stages: Vec<BoxedStage<Provider>>,
12 max_block: Option<BlockNumber>,
14 tip_tx: Option<watch::Sender<B256>>,
16 metrics_tx: Option<MetricEventsSender>,
17 fail_on_unwind: bool,
18}
19
20impl<Provider> PipelineBuilder<Provider> {
21 pub fn add_stage<S>(mut self, stage: S) -> Self
23 where
24 S: Stage<Provider> + 'static,
25 {
26 self.stages.push(Box::new(stage));
27 self
28 }
29
30 pub fn add_stages<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
38 let states = set.builder().build();
39 self.stages.reserve_exact(states.len());
40 for stage in states {
41 self.stages.push(stage);
42 }
43 self
44 }
45
46 pub const fn with_max_block(mut self, block: BlockNumber) -> Self {
50 self.max_block = Some(block);
51 self
52 }
53
54 pub fn with_tip_sender(mut self, tip_tx: watch::Sender<B256>) -> Self {
56 self.tip_tx = Some(tip_tx);
57 self
58 }
59
60 pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
62 self.metrics_tx = Some(metrics_tx);
63 self
64 }
65
66 pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
68 self.fail_on_unwind = yes;
69 self
70 }
71
72 pub fn build<N>(
74 self,
75 provider_factory: ProviderFactory<N>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 ) -> Pipeline<N>
78 where
79 N: ProviderNodeTypes,
80 ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
81 {
82 let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
83 Pipeline {
84 provider_factory,
85 stages,
86 max_block,
87 static_file_producer,
88 tip_tx,
89 event_sender: Default::default(),
90 progress: Default::default(),
91 metrics_tx,
92 fail_on_unwind,
93 }
94 }
95}
96
97impl<Provider> Default for PipelineBuilder<Provider> {
98 fn default() -> Self {
99 Self {
100 stages: Vec::new(),
101 max_block: None,
102 tip_tx: None,
103 metrics_tx: None,
104 fail_on_unwind: false,
105 }
106 }
107}
108
109impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("PipelineBuilder")
112 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
113 .field("max_block", &self.max_block)
114 .field("fail_on_unwind", &self.fail_on_unwind)
115 .finish()
116 }
117}