1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//! Helpers for setting up parts of the node.

use std::sync::Arc;

use reth_config::{config::StageConfig, PruneConfig};
use reth_consensus::Consensus;
use reth_downloaders::{
    bodies::bodies::BodiesDownloaderBuilder,
    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_evm::execute::BlockExecutorProvider;
use reth_exex::ExExManagerHandle;
use reth_network_p2p::{
    bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
};
use reth_node_core::primitives::{BlockNumber, B256};
use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::debug;
use tokio::sync::watch;

/// Constructs a [Pipeline] that's wired to the network
#[allow(clippy::too_many_arguments)]
pub fn build_networked_pipeline<N, Client, Executor>(
    config: &StageConfig,
    client: Client,
    consensus: Arc<dyn Consensus>,
    provider_factory: ProviderFactory<N>,
    task_executor: &TaskExecutor,
    metrics_tx: reth_stages::MetricEventsSender,
    prune_config: Option<PruneConfig>,
    max_block: Option<BlockNumber>,
    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
    executor: Executor,
    exex_manager_handle: ExExManagerHandle,
) -> eyre::Result<Pipeline<N>>
where
    N: ProviderNodeTypes,
    Client: BlockClient + 'static,
    Executor: BlockExecutorProvider,
{
    // building network downloaders using the fetch client
    let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
        .build(client.clone(), Arc::clone(&consensus))
        .into_task_with(task_executor);

    let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
        .build(client, Arc::clone(&consensus), provider_factory.clone())
        .into_task_with(task_executor);

    let pipeline = build_pipeline(
        provider_factory,
        config,
        header_downloader,
        body_downloader,
        consensus,
        max_block,
        metrics_tx,
        prune_config,
        static_file_producer,
        executor,
        exex_manager_handle,
    )?;

    Ok(pipeline)
}

/// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders.
#[allow(clippy::too_many_arguments)]
pub fn build_pipeline<N, H, B, Executor>(
    provider_factory: ProviderFactory<N>,
    stage_config: &StageConfig,
    header_downloader: H,
    body_downloader: B,
    consensus: Arc<dyn Consensus>,
    max_block: Option<u64>,
    metrics_tx: reth_stages::MetricEventsSender,
    prune_config: Option<PruneConfig>,
    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
    executor: Executor,
    exex_manager_handle: ExExManagerHandle,
) -> eyre::Result<Pipeline<N>>
where
    N: ProviderNodeTypes,
    H: HeaderDownloader + 'static,
    B: BodyDownloader + 'static,
    Executor: BlockExecutorProvider,
{
    let mut builder = Pipeline::<N>::builder();

    if let Some(max_block) = max_block {
        debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
        builder = builder.with_max_block(max_block)
    }

    let (tip_tx, tip_rx) = watch::channel(B256::ZERO);

    let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();

    let pipeline = builder
        .with_tip_sender(tip_tx)
        .with_metrics_tx(metrics_tx)
        .add_stages(
            DefaultStages::new(
                provider_factory.clone(),
                tip_rx,
                Arc::clone(&consensus),
                header_downloader,
                body_downloader,
                executor.clone(),
                stage_config.clone(),
                prune_modes.clone(),
            )
            .set(ExecutionStage::new(
                executor,
                stage_config.execution.into(),
                stage_config.execution_external_clean_threshold(),
                prune_modes,
                exex_manager_handle,
            )),
        )
        .build(provider_factory, static_file_producer);

    Ok(pipeline)
}