1use std::sync::Arc;
4
5use crate::BlockTy;
6use alloy_primitives::{BlockNumber, B256};
7use reth_config::{config::StageConfig, PruneConfig};
8use reth_consensus::FullConsensus;
9use reth_downloaders::{
10 bodies::bodies::BodiesDownloaderBuilder,
11 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
12};
13use reth_evm::ConfigureEvm;
14use reth_exex::ExExManagerHandle;
15use reth_network_p2p::{
16 bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
17};
18use reth_node_api::HeaderTy;
19use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
20use reth_stages::{
21 prelude::DefaultStages,
22 stages::{EraImportSource, ExecutionStage},
23 Pipeline, StageId, StageSet,
24};
25use reth_static_file::StaticFileProducer;
26use reth_tasks::TaskExecutor;
27use reth_tracing::tracing::debug;
28use tokio::sync::watch;
29
30#[expect(clippy::too_many_arguments)]
32pub fn build_networked_pipeline<N, Client, Evm>(
33 config: &StageConfig,
34 client: Client,
35 consensus: Arc<dyn FullConsensus<N::Primitives>>,
36 provider_factory: ProviderFactory<N>,
37 task_executor: &TaskExecutor,
38 metrics_tx: reth_stages::MetricEventsSender,
39 prune_config: PruneConfig,
40 max_block: Option<BlockNumber>,
41 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
42 evm_config: Evm,
43 exex_manager_handle: ExExManagerHandle<N::Primitives>,
44 era_import_source: Option<EraImportSource>,
45 disabled_stages: &[StageId],
46) -> eyre::Result<Pipeline<N>>
47where
48 N: ProviderNodeTypes,
49 Client: BlockClient<Block = BlockTy<N>> + 'static,
50 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
51{
52 let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
54 .build(client.clone(), consensus.clone())
55 .into_task_with(task_executor);
56
57 let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
58 .build(client, consensus.clone(), provider_factory.clone())
59 .into_task_with(task_executor);
60
61 let pipeline = build_pipeline(
62 provider_factory,
63 config,
64 header_downloader,
65 body_downloader,
66 consensus,
67 max_block,
68 metrics_tx,
69 prune_config,
70 static_file_producer,
71 evm_config,
72 exex_manager_handle,
73 era_import_source,
74 disabled_stages,
75 )?;
76
77 Ok(pipeline)
78}
79
80#[expect(clippy::too_many_arguments)]
82pub fn build_pipeline<N, H, B, Evm>(
83 provider_factory: ProviderFactory<N>,
84 stage_config: &StageConfig,
85 header_downloader: H,
86 body_downloader: B,
87 consensus: Arc<dyn FullConsensus<N::Primitives>>,
88 max_block: Option<u64>,
89 metrics_tx: reth_stages::MetricEventsSender,
90 prune_config: PruneConfig,
91 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
92 evm_config: Evm,
93 exex_manager_handle: ExExManagerHandle<N::Primitives>,
94 era_import_source: Option<EraImportSource>,
95 disabled_stages: &[StageId],
96) -> eyre::Result<Pipeline<N>>
97where
98 N: ProviderNodeTypes,
99 H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
100 B: BodyDownloader<Block = BlockTy<N>> + 'static,
101 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
102{
103 let mut builder = Pipeline::<N>::builder();
104
105 if let Some(max_block) = max_block {
106 debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
107 builder = builder.with_max_block(max_block)
108 }
109
110 let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
111
112 let pipeline = builder
113 .with_tip_sender(tip_tx)
114 .with_metrics_tx(metrics_tx)
115 .add_stages(
116 DefaultStages::new(
117 provider_factory.clone(),
118 tip_rx,
119 Arc::clone(&consensus),
120 header_downloader,
121 body_downloader,
122 evm_config.clone(),
123 stage_config.clone(),
124 prune_config.segments,
125 era_import_source,
126 )
127 .set(ExecutionStage::new(
128 evm_config,
129 consensus,
130 stage_config.execution.into(),
131 stage_config.execution_external_clean_threshold(),
132 exex_manager_handle,
133 ))
134 .disable_all(disabled_stages),
135 )
136 .build(provider_factory, static_file_producer);
137
138 Ok(pipeline)
139}