1use crate::{args::NetworkArgs, utils::get_single_header};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockNumber, B256};
6use clap::Parser;
7use futures::StreamExt;
8use reth_chainspec::ChainSpec;
9use reth_cli::chainspec::ChainSpecParser;
10use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::Config;
14use reth_consensus::FullConsensus;
15use reth_db::DatabaseEnv;
16use reth_downloaders::{
17 bodies::bodies::BodiesDownloaderBuilder,
18 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
19};
20use reth_errors::ConsensusError;
21use reth_ethereum_primitives::EthPrimitives;
22use reth_exex::ExExManagerHandle;
23use reth_network::{BlockDownloaderProvider, NetworkHandle};
24use reth_network_api::NetworkInfo;
25use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
26use reth_node_api::NodeTypesWithDBAdapter;
27use reth_node_ethereum::{consensus::EthBeaconConsensus, EthExecutorProvider};
28use reth_node_events::node::NodeEvent;
29use reth_provider::{
30 providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
31};
32use reth_prune::PruneModes;
33use reth_stages::{
34 sets::DefaultStages, stages::ExecutionStage, ExecutionStageThresholds, Pipeline, StageId,
35 StageSet,
36};
37use reth_static_file::StaticFileProducer;
38use reth_tasks::TaskExecutor;
39use std::{path::PathBuf, sync::Arc};
40use tokio::sync::watch;
41use tracing::*;
42
43#[derive(Debug, Parser)]
45pub struct Command<C: ChainSpecParser> {
46 #[command(flatten)]
47 env: EnvironmentArgs<C>,
48
49 #[command(flatten)]
50 network: NetworkArgs,
51
52 #[arg(long)]
54 pub to: u64,
55
56 #[arg(long, default_value = "1000")]
59 pub interval: u64,
60}
61
62impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
63 fn build_pipeline<N, Client>(
64 &self,
65 config: &Config,
66 client: Client,
67 consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
68 provider_factory: ProviderFactory<N>,
69 task_executor: &TaskExecutor,
70 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
71 ) -> eyre::Result<Pipeline<N>>
72 where
73 N: ProviderNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives> + CliNodeTypes,
74 Client: EthBlockClient + 'static,
75 {
76 let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
78 .build(client.clone(), consensus.clone().as_header_validator())
79 .into_task_with(task_executor);
80
81 let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
82 .build(client, consensus.clone().as_consensus(), provider_factory.clone())
83 .into_task_with(task_executor);
84
85 let stage_conf = &config.stages;
86 let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
87
88 let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
89 let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec());
90
91 let pipeline = Pipeline::<N>::builder()
92 .with_tip_sender(tip_tx)
93 .add_stages(
94 DefaultStages::new(
95 provider_factory.clone(),
96 tip_rx,
97 consensus.clone(),
98 header_downloader,
99 body_downloader,
100 executor.clone(),
101 stage_conf.clone(),
102 prune_modes,
103 )
104 .set(ExecutionStage::new(
105 executor,
106 consensus.clone(),
107 ExecutionStageThresholds {
108 max_blocks: None,
109 max_changes: None,
110 max_cumulative_gas: None,
111 max_duration: None,
112 },
113 stage_conf.execution_external_clean_threshold(),
114 ExExManagerHandle::empty(),
115 )),
116 )
117 .build(provider_factory, static_file_producer);
118
119 Ok(pipeline)
120 }
121
122 async fn build_network<
123 N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives>,
124 >(
125 &self,
126 config: &Config,
127 task_executor: TaskExecutor,
128 provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
129 network_secret_path: PathBuf,
130 default_peers_path: PathBuf,
131 ) -> eyre::Result<NetworkHandle> {
132 let secret_key = get_secret_key(&network_secret_path)?;
133 let network = self
134 .network
135 .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
136 .with_task_executor(Box::new(task_executor))
137 .build(provider_factory)
138 .start_network()
139 .await?;
140 info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
141 debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
142 Ok(network)
143 }
144
145 async fn fetch_block_hash<Client>(
146 &self,
147 client: Client,
148 block: BlockNumber,
149 ) -> eyre::Result<B256>
150 where
151 Client: HeadersClient<Header: reth_primitives_traits::BlockHeader>,
152 {
153 info!(target: "reth::cli", ?block, "Fetching block from the network.");
154 loop {
155 match get_single_header(&client, BlockHashOrNumber::Number(block)).await {
156 Ok(tip_header) => {
157 info!(target: "reth::cli", ?block, "Successfully fetched block");
158 return Ok(tip_header.hash())
159 }
160 Err(error) => {
161 error!(target: "reth::cli", ?block, %error, "Failed to fetch the block. Retrying...");
162 }
163 }
164 }
165 }
166
167 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives>>(
169 self,
170 ctx: CliContext,
171 ) -> eyre::Result<()> {
172 let Environment { provider_factory, config, data_dir } =
173 self.env.init::<N>(AccessRights::RW)?;
174
175 let consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>> =
176 Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
177
178 let network_secret_path =
180 self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
181 let network = self
182 .build_network(
183 &config,
184 ctx.task_executor.clone(),
185 provider_factory.clone(),
186 network_secret_path,
187 data_dir.known_peers(),
188 )
189 .await?;
190
191 let static_file_producer =
192 StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
193
194 let fetch_client = network.fetch_client().await?;
196 let mut pipeline = self.build_pipeline(
197 &config,
198 fetch_client.clone(),
199 consensus.clone(),
200 provider_factory.clone(),
201 &ctx.task_executor,
202 static_file_producer,
203 )?;
204
205 let provider = provider_factory.provider()?;
206
207 let latest_block_number =
208 provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
209 if latest_block_number.unwrap_or_default() >= self.to {
210 info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
211 return Ok(())
212 }
213
214 ctx.task_executor.spawn_critical(
215 "events task",
216 reth_node_events::node::handle_events(
217 Some(Box::new(network)),
218 latest_block_number,
219 pipeline.events().map(Into::<NodeEvent<N::Primitives>>::into),
220 ),
221 );
222
223 let mut current_max_block = latest_block_number.unwrap_or_default();
224 while current_max_block < self.to {
225 let next_block = current_max_block + 1;
226 let target_block = self.to.min(current_max_block + self.interval);
227 let target_block_hash =
228 self.fetch_block_hash(fetch_client.clone(), target_block).await?;
229
230 info!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, "Starting pipeline");
232 pipeline.set_tip(target_block_hash);
233 let result = pipeline.run_loop().await?;
234 trace!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, ?result, "Pipeline finished");
235
236 provider_factory.provider_rw()?.unwind_trie_state_range(next_block..=target_block)?;
238
239 current_max_block = target_block;
241 }
242
243 Ok(())
244 }
245}