reth_node_builder/
setup.rs

1//! Helpers for setting up parts of the node.
2
3use std::sync::Arc;
4
5use crate::BlockTy;
6use alloy_primitives::{BlockNumber, B256};
7use reth_config::{config::StageConfig, PruneConfig};
8use reth_consensus::{ConsensusError, FullConsensus};
9use reth_downloaders::{
10    bodies::bodies::BodiesDownloaderBuilder,
11    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
12};
13use reth_evm::execute::BlockExecutorProvider;
14use reth_exex::ExExManagerHandle;
15use reth_network_p2p::{
16    bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
17};
18use reth_node_api::HeaderTy;
19use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
20use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
21use reth_static_file::StaticFileProducer;
22use reth_tasks::TaskExecutor;
23use reth_tracing::tracing::debug;
24use tokio::sync::watch;
25
26/// Constructs a [Pipeline] that's wired to the network
27#[allow(clippy::too_many_arguments)]
28pub fn build_networked_pipeline<N, Client, Executor>(
29    config: &StageConfig,
30    client: Client,
31    consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
32    provider_factory: ProviderFactory<N>,
33    task_executor: &TaskExecutor,
34    metrics_tx: reth_stages::MetricEventsSender,
35    prune_config: Option<PruneConfig>,
36    max_block: Option<BlockNumber>,
37    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
38    executor: Executor,
39    exex_manager_handle: ExExManagerHandle<N::Primitives>,
40) -> eyre::Result<Pipeline<N>>
41where
42    N: ProviderNodeTypes,
43    Client: BlockClient<Block = BlockTy<N>> + 'static,
44    Executor: BlockExecutorProvider<Primitives = N::Primitives>,
45{
46    // building network downloaders using the fetch client
47    let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
48        .build(client.clone(), consensus.clone().as_header_validator())
49        .into_task_with(task_executor);
50
51    let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
52        .build(client, consensus.clone().as_consensus(), provider_factory.clone())
53        .into_task_with(task_executor);
54
55    let pipeline = build_pipeline(
56        provider_factory,
57        config,
58        header_downloader,
59        body_downloader,
60        consensus,
61        max_block,
62        metrics_tx,
63        prune_config,
64        static_file_producer,
65        executor,
66        exex_manager_handle,
67    )?;
68
69    Ok(pipeline)
70}
71
72/// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders.
73#[allow(clippy::too_many_arguments)]
74pub fn build_pipeline<N, H, B, Executor>(
75    provider_factory: ProviderFactory<N>,
76    stage_config: &StageConfig,
77    header_downloader: H,
78    body_downloader: B,
79    consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
80    max_block: Option<u64>,
81    metrics_tx: reth_stages::MetricEventsSender,
82    prune_config: Option<PruneConfig>,
83    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
84    executor: Executor,
85    exex_manager_handle: ExExManagerHandle<N::Primitives>,
86) -> eyre::Result<Pipeline<N>>
87where
88    N: ProviderNodeTypes,
89    H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
90    B: BodyDownloader<Block = BlockTy<N>> + 'static,
91    Executor: BlockExecutorProvider<Primitives = N::Primitives>,
92{
93    let mut builder = Pipeline::<N>::builder();
94
95    if let Some(max_block) = max_block {
96        debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
97        builder = builder.with_max_block(max_block)
98    }
99
100    let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
101
102    let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
103
104    let pipeline = builder
105        .with_tip_sender(tip_tx)
106        .with_metrics_tx(metrics_tx)
107        .add_stages(
108            DefaultStages::new(
109                provider_factory.clone(),
110                tip_rx,
111                Arc::clone(&consensus),
112                header_downloader,
113                body_downloader,
114                executor.clone(),
115                stage_config.clone(),
116                prune_modes,
117            )
118            .set(ExecutionStage::new(
119                executor,
120                consensus,
121                stage_config.execution.into(),
122                stage_config.execution_external_clean_threshold(),
123                exex_manager_handle,
124            )),
125        )
126        .build(provider_factory, static_file_producer);
127
128    Ok(pipeline)
129}