1use alloy_primitives::B256;
4use clap::Parser;
5use reth_chainspec::EthChainSpec;
6use reth_cli::chainspec::ChainSpecParser;
7use reth_config::{config::EtlConfig, Config};
8use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
9use reth_db::{init_db, open_db_read_only, DatabaseEnv};
10use reth_db_common::init::init_genesis;
11use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
12use reth_eth_wire::NetPrimitivesFor;
13use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
14use reth_network::NetworkEventListenerProvider;
15use reth_node_api::FullNodeTypesAdapter;
16use reth_node_builder::{
17 Node, NodeComponents, NodeComponentsBuilder, NodeTypes, NodeTypesWithDBAdapter,
18};
19use reth_node_core::{
20 args::{DatabaseArgs, DatadirArgs},
21 dirs::{ChainPath, DataDirPath},
22};
23use reth_provider::{
24 providers::{BlockchainProvider, NodeTypesForProvider, StaticFileProvider},
25 ProviderFactory, StaticFileProviderFactory,
26};
27use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
28use reth_static_file::StaticFileProducer;
29use std::{path::PathBuf, sync::Arc};
30use tokio::sync::watch;
31use tracing::{debug, info, warn};
32
33#[derive(Debug, Parser)]
35pub struct EnvironmentArgs<C: ChainSpecParser> {
36 #[command(flatten)]
38 pub datadir: DatadirArgs,
39
40 #[arg(long, value_name = "FILE")]
42 pub config: Option<PathBuf>,
43
44 #[arg(
48 long,
49 value_name = "CHAIN_OR_PATH",
50 long_help = C::help_message(),
51 default_value = C::SUPPORTED_CHAINS[0],
52 value_parser = C::parser(),
53 global = true
54 )]
55 pub chain: Arc<C::ChainSpec>,
56
57 #[command(flatten)]
59 pub db: DatabaseArgs,
60}
61
62impl<C: ChainSpecParser> EnvironmentArgs<C> {
63 pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
66 where
67 C: ChainSpecParser<ChainSpec = N::ChainSpec>,
68 {
69 let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
70 let db_path = data_dir.db();
71 let sf_path = data_dir.static_files();
72
73 if access.is_read_write() {
74 reth_fs_util::create_dir_all(&db_path)?;
75 reth_fs_util::create_dir_all(&sf_path)?;
76 }
77
78 let config_path = self.config.clone().unwrap_or_else(|| data_dir.config());
79
80 let mut config = Config::from_path(config_path)
81 .inspect_err(
82 |err| warn!(target: "reth::cli", %err, "Failed to load config file, using default"),
83 )
84 .unwrap_or_default();
85
86 if config.stages.etl.dir.is_none() {
88 config.stages.etl.dir = Some(EtlConfig::from_datadir(data_dir.data_dir()));
89 }
90 if config.stages.era.folder.is_none() {
91 config.stages.era = config.stages.era.with_datadir(data_dir.data_dir());
92 }
93
94 info!(target: "reth::cli", ?db_path, ?sf_path, "Opening storage");
95 let (db, sfp) = match access {
96 AccessRights::RW => (
97 Arc::new(init_db(db_path, self.db.database_args())?),
98 StaticFileProvider::read_write(sf_path)?,
99 ),
100 AccessRights::RO => (
101 Arc::new(open_db_read_only(&db_path, self.db.database_args())?),
102 StaticFileProvider::read_only(sf_path, false)?,
103 ),
104 };
105
106 let provider_factory = self.create_provider_factory(&config, db, sfp)?;
107 if access.is_read_write() {
108 debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
109 init_genesis(&provider_factory)?;
110 }
111
112 Ok(Environment { config, provider_factory, data_dir })
113 }
114
115 fn create_provider_factory<N: CliNodeTypes>(
121 &self,
122 config: &Config,
123 db: Arc<DatabaseEnv>,
124 static_file_provider: StaticFileProvider<N::Primitives>,
125 ) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>>
126 where
127 C: ChainSpecParser<ChainSpec = N::ChainSpec>,
128 {
129 let has_receipt_pruning = config.prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
130 let prune_modes =
131 config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default();
132 let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::new(
133 db,
134 self.chain.clone(),
135 static_file_provider,
136 )
137 .with_prune_modes(prune_modes.clone());
138
139 if let Some(unwind_target) = factory
141 .static_file_provider()
142 .check_consistency(&factory.provider()?, has_receipt_pruning)?
143 {
144 if factory.db_ref().is_read_only()? {
145 warn!(target: "reth::cli", ?unwind_target, "Inconsistent storage. Restart node to heal.");
146 return Ok(factory)
147 }
148
149 assert_ne!(
152 unwind_target,
153 PipelineTarget::Unwind(0),
154 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
155 );
156
157 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
158
159 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
160
161 let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::builder()
163 .add_stages(DefaultStages::new(
164 factory.clone(),
165 tip_rx,
166 Arc::new(NoopConsensus::default()),
167 NoopHeaderDownloader::default(),
168 NoopBodiesDownloader::default(),
169 NoopEvmConfig::<N::Evm>::default(),
170 config.stages.clone(),
171 prune_modes.clone(),
172 None,
173 ))
174 .build(factory.clone(), StaticFileProducer::new(factory.clone(), prune_modes));
175
176 pipeline.move_to_static_files()?;
178 pipeline.unwind(unwind_target.unwind_target().expect("should exist"), None)?;
179 }
180
181 Ok(factory)
182 }
183}
184
185#[derive(Debug)]
187pub struct Environment<N: NodeTypes> {
188 pub config: Config,
190 pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
192 pub data_dir: ChainPath<DataDirPath>,
194}
195
196#[derive(Debug, Copy, Clone)]
198pub enum AccessRights {
199 RW,
201 RO,
203}
204
205impl AccessRights {
206 pub const fn is_read_write(&self) -> bool {
208 matches!(self, Self::RW)
209 }
210}
211
212type FullTypesAdapter<T> = FullNodeTypesAdapter<
214 T,
215 Arc<DatabaseEnv>,
216 BlockchainProvider<NodeTypesWithDBAdapter<T, Arc<DatabaseEnv>>>,
217>;
218
219pub trait CliHeader {
221 fn set_number(&mut self, number: u64);
222}
223
224impl CliHeader for alloy_consensus::Header {
225 fn set_number(&mut self, number: u64) {
226 self.number = number;
227 }
228}
229
230pub trait CliNodeTypes: NodeTypesForProvider {
233 type Evm: ConfigureEvm<Primitives = Self::Primitives>;
234 type NetworkPrimitives: NetPrimitivesFor<Self::Primitives>;
235}
236
237impl<N> CliNodeTypes for N
238where
239 N: Node<FullTypesAdapter<Self>> + NodeTypesForProvider,
240{
241 type Evm = <<N::ComponentsBuilder as NodeComponentsBuilder<FullTypesAdapter<Self>>>::Components as NodeComponents<FullTypesAdapter<Self>>>::Evm;
242 type NetworkPrimitives = <<<N::ComponentsBuilder as NodeComponentsBuilder<FullTypesAdapter<Self>>>::Components as NodeComponents<FullTypesAdapter<Self>>>::Network as NetworkEventListenerProvider>::Primitives;
243}
244
245pub trait CliNodeComponents<N: CliNodeTypes>: Send + Sync + 'static {
247 type Evm: ConfigureEvm<Primitives = N::Primitives> + 'static;
249 type Consensus: FullConsensus<N::Primitives, Error = ConsensusError> + Clone + 'static;
251
252 fn evm_config(&self) -> &Self::Evm;
254 fn consensus(&self) -> &Self::Consensus;
256}
257
258impl<N: CliNodeTypes, E, C> CliNodeComponents<N> for (E, C)
259where
260 E: ConfigureEvm<Primitives = N::Primitives> + 'static,
261 C: FullConsensus<N::Primitives, Error = ConsensusError> + Clone + 'static,
262{
263 type Evm = E;
264 type Consensus = C;
265
266 fn evm_config(&self) -> &Self::Evm {
267 &self.0
268 }
269
270 fn consensus(&self) -> &Self::Consensus {
271 &self.1
272 }
273}
274
275pub trait CliComponentsBuilder<N: CliNodeTypes>:
277 FnOnce(Arc<N::ChainSpec>) -> Self::Components + Send + Sync + 'static
278{
279 type Components: CliNodeComponents<N>;
280}
281
282impl<N: CliNodeTypes, F, Comp> CliComponentsBuilder<N> for F
283where
284 F: FnOnce(Arc<N::ChainSpec>) -> Comp + Send + Sync + 'static,
285 Comp: CliNodeComponents<N>,
286{
287 type Components = Comp;
288}