reth_stages_api/pipeline/
builder.rs1use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet};
2use alloy_primitives::{BlockNumber, B256};
3use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
4use reth_static_file::StaticFileProducer;
5use tokio::sync::watch;
6
7#[must_use = "call `build` to construct the pipeline"]
9pub struct PipelineBuilder<Provider> {
10 stages: Vec<BoxedStage<Provider>>,
12 max_block: Option<BlockNumber>,
14 tip_tx: Option<watch::Sender<B256>>,
16 metrics_tx: Option<MetricEventsSender>,
17 fail_on_unwind: bool,
18}
19
20impl<Provider> PipelineBuilder<Provider> {
21 pub fn add_stage<S>(mut self, stage: S) -> Self
23 where
24 S: Stage<Provider> + 'static,
25 {
26 self.stages.push(Box::new(stage));
27 self
28 }
29
30 pub fn add_stages<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
38 let stages = set.builder().build();
39 self.stages.reserve(stages.len());
40 self.stages.extend(stages);
41 self
42 }
43
44 pub const fn with_max_block(mut self, block: BlockNumber) -> Self {
48 self.max_block = Some(block);
49 self
50 }
51
52 pub fn with_tip_sender(mut self, tip_tx: watch::Sender<B256>) -> Self {
54 self.tip_tx = Some(tip_tx);
55 self
56 }
57
58 pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
60 self.metrics_tx = Some(metrics_tx);
61 self
62 }
63
64 pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
66 self.fail_on_unwind = yes;
67 self
68 }
69
70 pub fn build<N>(
72 self,
73 provider_factory: ProviderFactory<N>,
74 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
75 ) -> Pipeline<N>
76 where
77 N: ProviderNodeTypes,
78 ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
79 {
80 let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
81 Pipeline {
82 provider_factory,
83 stages,
84 max_block,
85 static_file_producer,
86 tip_tx,
87 event_sender: Default::default(),
88 progress: Default::default(),
89 metrics_tx,
90 fail_on_unwind,
91 last_detached_head_unwind_target: None,
92 detached_head_attempts: 0,
93 }
94 }
95}
96
97impl<Provider> Default for PipelineBuilder<Provider> {
98 fn default() -> Self {
99 Self {
100 stages: Vec::new(),
101 max_block: None,
102 tip_tx: None,
103 metrics_tx: None,
104 fail_on_unwind: false,
105 }
106 }
107}
108
109impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
110 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 f.debug_struct("PipelineBuilder")
112 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
113 .field("max_block", &self.max_block)
114 .field("fail_on_unwind", &self.fail_on_unwind)
115 .finish()
116 }
117}