reth_node_builder/
setup.rs

1//! Helpers for setting up parts of the node.
2
3use std::sync::Arc;
4
5use crate::BlockTy;
6use alloy_primitives::{BlockNumber, B256};
7use reth_config::{config::StageConfig, PruneConfig};
8use reth_consensus::{ConsensusError, FullConsensus};
9use reth_downloaders::{
10    bodies::bodies::BodiesDownloaderBuilder,
11    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
12};
13use reth_evm::ConfigureEvm;
14use reth_exex::ExExManagerHandle;
15use reth_network_p2p::{
16    bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
17};
18use reth_node_api::HeaderTy;
19use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
20use reth_stages::{
21    prelude::DefaultStages,
22    stages::{EraImportSource, ExecutionStage},
23    Pipeline, StageSet,
24};
25use reth_static_file::StaticFileProducer;
26use reth_tasks::TaskExecutor;
27use reth_tracing::tracing::debug;
28use tokio::sync::watch;
29
30/// Constructs a [Pipeline] that's wired to the network
31#[expect(clippy::too_many_arguments)]
32pub fn build_networked_pipeline<N, Client, Evm>(
33    config: &StageConfig,
34    client: Client,
35    consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
36    provider_factory: ProviderFactory<N>,
37    task_executor: &TaskExecutor,
38    metrics_tx: reth_stages::MetricEventsSender,
39    prune_config: Option<PruneConfig>,
40    max_block: Option<BlockNumber>,
41    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
42    evm_config: Evm,
43    exex_manager_handle: ExExManagerHandle<N::Primitives>,
44    era_import_source: Option<EraImportSource>,
45) -> eyre::Result<Pipeline<N>>
46where
47    N: ProviderNodeTypes,
48    Client: BlockClient<Block = BlockTy<N>> + 'static,
49    Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
50{
51    // building network downloaders using the fetch client
52    let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
53        .build(client.clone(), consensus.clone())
54        .into_task_with(task_executor);
55
56    let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
57        .build(client, consensus.clone(), provider_factory.clone())
58        .into_task_with(task_executor);
59
60    let pipeline = build_pipeline(
61        provider_factory,
62        config,
63        header_downloader,
64        body_downloader,
65        consensus,
66        max_block,
67        metrics_tx,
68        prune_config,
69        static_file_producer,
70        evm_config,
71        exex_manager_handle,
72        era_import_source,
73    )?;
74
75    Ok(pipeline)
76}
77
78/// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders.
79#[expect(clippy::too_many_arguments)]
80pub fn build_pipeline<N, H, B, Evm>(
81    provider_factory: ProviderFactory<N>,
82    stage_config: &StageConfig,
83    header_downloader: H,
84    body_downloader: B,
85    consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
86    max_block: Option<u64>,
87    metrics_tx: reth_stages::MetricEventsSender,
88    prune_config: Option<PruneConfig>,
89    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
90    evm_config: Evm,
91    exex_manager_handle: ExExManagerHandle<N::Primitives>,
92    era_import_source: Option<EraImportSource>,
93) -> eyre::Result<Pipeline<N>>
94where
95    N: ProviderNodeTypes,
96    H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
97    B: BodyDownloader<Block = BlockTy<N>> + 'static,
98    Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
99{
100    let mut builder = Pipeline::<N>::builder();
101
102    if let Some(max_block) = max_block {
103        debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
104        builder = builder.with_max_block(max_block)
105    }
106
107    let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
108
109    let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
110
111    let pipeline = builder
112        .with_tip_sender(tip_tx)
113        .with_metrics_tx(metrics_tx)
114        .add_stages(
115            DefaultStages::new(
116                provider_factory.clone(),
117                tip_rx,
118                Arc::clone(&consensus),
119                header_downloader,
120                body_downloader,
121                evm_config.clone(),
122                stage_config.clone(),
123                prune_modes,
124                era_import_source,
125            )
126            .set(ExecutionStage::new(
127                evm_config,
128                consensus,
129                stage_config.execution.into(),
130                stage_config.execution_external_clean_threshold(),
131                exex_manager_handle,
132            )),
133        )
134        .build(provider_factory, static_file_producer);
135
136    Ok(pipeline)
137}