1use alloy_primitives::B256;
4use clap::Parser;
5use reth_chainspec::EthChainSpec;
6use reth_cli::chainspec::ChainSpecParser;
7use reth_config::{config::EtlConfig, Config};
8use reth_consensus::{noop::NoopConsensus, ConsensusError, FullConsensus};
9use reth_db::{init_db, open_db_read_only, DatabaseEnv};
10use reth_db_common::init::init_genesis;
11use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
12use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider};
13use reth_node_builder::{NodeTypes, NodeTypesWithDBAdapter};
14use reth_node_core::{
15 args::{DatabaseArgs, DatadirArgs},
16 dirs::{ChainPath, DataDirPath},
17};
18use reth_provider::{
19 providers::{NodeTypesForProvider, StaticFileProvider},
20 ProviderFactory, StaticFileProviderFactory,
21};
22use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
23use reth_static_file::StaticFileProducer;
24use std::{path::PathBuf, sync::Arc};
25use tokio::sync::watch;
26use tracing::{debug, info, warn};
27
28#[derive(Debug, Parser)]
30pub struct EnvironmentArgs<C: ChainSpecParser> {
31 #[command(flatten)]
33 pub datadir: DatadirArgs,
34
35 #[arg(long, value_name = "FILE")]
37 pub config: Option<PathBuf>,
38
39 #[arg(
43 long,
44 value_name = "CHAIN_OR_PATH",
45 long_help = C::help_message(),
46 default_value = C::SUPPORTED_CHAINS[0],
47 value_parser = C::parser(),
48 global = true
49 )]
50 pub chain: Arc<C::ChainSpec>,
51
52 #[command(flatten)]
54 pub db: DatabaseArgs,
55}
56
57impl<C: ChainSpecParser> EnvironmentArgs<C> {
58 pub fn init<N: CliNodeTypes>(&self, access: AccessRights) -> eyre::Result<Environment<N>>
61 where
62 C: ChainSpecParser<ChainSpec = N::ChainSpec>,
63 {
64 let data_dir = self.datadir.clone().resolve_datadir(self.chain.chain());
65 let db_path = data_dir.db();
66 let sf_path = data_dir.static_files();
67
68 if access.is_read_write() {
69 reth_fs_util::create_dir_all(&db_path)?;
70 reth_fs_util::create_dir_all(&sf_path)?;
71 }
72
73 let config_path = self.config.clone().unwrap_or_else(|| data_dir.config());
74
75 let mut config = Config::from_path(config_path)
76 .inspect_err(
77 |err| warn!(target: "reth::cli", %err, "Failed to load config file, using default"),
78 )
79 .unwrap_or_default();
80
81 if config.stages.etl.dir.is_none() {
83 config.stages.etl.dir = Some(EtlConfig::from_datadir(data_dir.data_dir()));
84 }
85
86 info!(target: "reth::cli", ?db_path, ?sf_path, "Opening storage");
87 let (db, sfp) = match access {
88 AccessRights::RW => (
89 Arc::new(init_db(db_path, self.db.database_args())?),
90 StaticFileProvider::read_write(sf_path)?,
91 ),
92 AccessRights::RO => (
93 Arc::new(open_db_read_only(&db_path, self.db.database_args())?),
94 StaticFileProvider::read_only(sf_path, false)?,
95 ),
96 };
97
98 let provider_factory = self.create_provider_factory(&config, db, sfp)?;
99 if access.is_read_write() {
100 debug!(target: "reth::cli", chain=%self.chain.chain(), genesis=?self.chain.genesis_hash(), "Initializing genesis");
101 init_genesis(&provider_factory)?;
102 }
103
104 Ok(Environment { config, provider_factory, data_dir })
105 }
106
107 fn create_provider_factory<N: CliNodeTypes>(
113 &self,
114 config: &Config,
115 db: Arc<DatabaseEnv>,
116 static_file_provider: StaticFileProvider<N::Primitives>,
117 ) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>>
118 where
119 C: ChainSpecParser<ChainSpec = N::ChainSpec>,
120 {
121 let has_receipt_pruning = config.prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
122 let prune_modes =
123 config.prune.as_ref().map(|prune| prune.segments.clone()).unwrap_or_default();
124 let factory = ProviderFactory::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::new(
125 db,
126 self.chain.clone(),
127 static_file_provider,
128 )
129 .with_prune_modes(prune_modes.clone());
130
131 if let Some(unwind_target) = factory
133 .static_file_provider()
134 .check_consistency(&factory.provider()?, has_receipt_pruning)?
135 {
136 if factory.db_ref().is_read_only()? {
137 warn!(target: "reth::cli", ?unwind_target, "Inconsistent storage. Restart node to heal.");
138 return Ok(factory)
139 }
140
141 assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
144
145 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
146
147 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
148
149 let mut pipeline = Pipeline::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::builder()
151 .add_stages(DefaultStages::new(
152 factory.clone(),
153 tip_rx,
154 Arc::new(NoopConsensus::default()),
155 NoopHeaderDownloader::default(),
156 NoopBodiesDownloader::default(),
157 NoopBlockExecutorProvider::<N::Primitives>::default(),
158 config.stages.clone(),
159 prune_modes.clone(),
160 ))
161 .build(factory.clone(), StaticFileProducer::new(factory.clone(), prune_modes));
162
163 pipeline.move_to_static_files()?;
165 pipeline.unwind(unwind_target.unwind_target().expect("should exist"), None)?;
166 }
167
168 Ok(factory)
169 }
170}
171
172#[derive(Debug)]
174pub struct Environment<N: NodeTypes> {
175 pub config: Config,
177 pub provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
179 pub data_dir: ChainPath<DataDirPath>,
181}
182
183#[derive(Debug, Copy, Clone)]
185pub enum AccessRights {
186 RW,
188 RO,
190}
191
192impl AccessRights {
193 pub const fn is_read_write(&self) -> bool {
195 matches!(self, Self::RW)
196 }
197}
198
199pub trait CliNodeTypes: NodeTypes + NodeTypesForProvider {}
202impl<N> CliNodeTypes for N where N: NodeTypes + NodeTypesForProvider {}
203
204pub trait CliNodeComponents<N: CliNodeTypes> {
206 type Executor: BlockExecutorProvider<Primitives = N::Primitives>;
208 type Consensus: FullConsensus<N::Primitives, Error = ConsensusError> + Clone + 'static;
210
211 fn executor(&self) -> &Self::Executor;
213 fn consensus(&self) -> &Self::Consensus;
215}
216
217impl<N: CliNodeTypes, E, C> CliNodeComponents<N> for (E, C)
218where
219 E: BlockExecutorProvider<Primitives = N::Primitives>,
220 C: FullConsensus<N::Primitives, Error = ConsensusError> + Clone + 'static,
221{
222 type Executor = E;
223 type Consensus = C;
224
225 fn executor(&self) -> &Self::Executor {
226 &self.0
227 }
228
229 fn consensus(&self) -> &Self::Consensus {
230 &self.1
231 }
232}