reth_stages_api/pipeline/
builder.rsuse crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageId, StageSet};
use alloy_primitives::{BlockNumber, B256};
use reth_provider::{providers::ProviderNodeTypes, DatabaseProviderFactory, ProviderFactory};
use reth_static_file::StaticFileProducer;
use tokio::sync::watch;
#[must_use = "call `build` to construct the pipeline"]
pub struct PipelineBuilder<Provider> {
stages: Vec<BoxedStage<Provider>>,
max_block: Option<BlockNumber>,
tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>,
fail_on_unwind: bool,
}
impl<Provider> PipelineBuilder<Provider> {
pub fn add_stage<S>(mut self, stage: S) -> Self
where
S: Stage<Provider> + 'static,
{
self.stages.push(Box::new(stage));
self
}
pub fn add_stages<Set: StageSet<Provider>>(mut self, set: Set) -> Self {
let states = set.builder().build();
self.stages.reserve_exact(states.len());
for stage in states {
self.stages.push(stage);
}
self
}
pub const fn with_max_block(mut self, block: BlockNumber) -> Self {
self.max_block = Some(block);
self
}
pub fn with_tip_sender(mut self, tip_tx: watch::Sender<B256>) -> Self {
self.tip_tx = Some(tip_tx);
self
}
pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
self.metrics_tx = Some(metrics_tx);
self
}
pub const fn with_fail_on_unwind(mut self, yes: bool) -> Self {
self.fail_on_unwind = yes;
self
}
pub fn build<N>(
self,
provider_factory: ProviderFactory<N>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
) -> Pipeline<N>
where
N: ProviderNodeTypes,
ProviderFactory<N>: DatabaseProviderFactory<ProviderRW = Provider>,
{
let Self { stages, max_block, tip_tx, metrics_tx, fail_on_unwind } = self;
Pipeline {
provider_factory,
stages,
max_block,
static_file_producer,
tip_tx,
event_sender: Default::default(),
progress: Default::default(),
metrics_tx,
fail_on_unwind,
}
}
}
impl<Provider> Default for PipelineBuilder<Provider> {
fn default() -> Self {
Self {
stages: Vec::new(),
max_block: None,
tip_tx: None,
metrics_tx: None,
fail_on_unwind: false,
}
}
}
impl<Provider> std::fmt::Debug for PipelineBuilder<Provider> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PipelineBuilder")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.field("fail_on_unwind", &self.fail_on_unwind)
.finish()
}
}