1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{
46 init_genesis_with_settings, init_genesis_with_settings_and_validate, InitStorageError,
47};
48use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
49use reth_engine_local::MiningMode;
50use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
51use reth_exex::ExExManagerHandle;
52use reth_fs_util as fs;
53use reth_network_p2p::headers::client::HeadersClient;
54use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
55use reth_node_core::{
56 args::{DefaultEraHost, PruneConfigKind},
57 dirs::{ChainPath, DataDirPath},
58 node_config::NodeConfig,
59 primitives::BlockHeader,
60 version::version_metadata,
61};
62use reth_node_metrics::{
63 chain::ChainSpecInfo,
64 hooks::Hooks,
65 recorder::install_prometheus_recorder,
66 server::{MetricServer, MetricServerConfig},
67 storage::StorageSettingsInfo,
68 version::VersionInfo,
69};
70use reth_provider::{
71 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
72 BalConfig, BalStoreHandle, BlockHashReader, BlockNumReader, InMemoryBalStore, ProviderError,
73 ProviderFactory, ProviderResult, RocksDBProviderFactory, StageCheckpointReader,
74 StaticFileProviderBuilder, StaticFileProviderFactory, StorageSettingsCache,
75};
76use reth_prune::{PruneModes, PrunerBuilder};
77use reth_rpc_builder::config::RethRpcServerConfig;
78use reth_rpc_layer::JwtSecret;
79use reth_stages::{
80 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
81 StageId, StageSet,
82};
83use reth_static_file::StaticFileProducer;
84use reth_tasks::TaskExecutor;
85use reth_tracing::{
86 throttle,
87 tracing::{debug, error, info, warn},
88};
89use reth_transaction_pool::TransactionPool;
90use reth_trie_db::ChangesetCache;
91use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism, time::Duration};
92use tokio::sync::{
93 mpsc::{unbounded_channel, UnboundedSender},
94 oneshot, watch,
95};
96
97use futures::{future::Either, stream, Stream, StreamExt};
98use reth_node_ethstats::EthStatsService;
99use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
100
101#[derive(Debug, Clone)]
120pub struct LaunchContext {
121 pub task_executor: TaskExecutor,
123 pub data_dir: ChainPath<DataDirPath>,
125}
126
127impl LaunchContext {
128 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
130 Self { task_executor, data_dir }
131 }
132
133 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
135 LaunchContextWith { inner: self, attachment }
136 }
137
138 pub fn with_loaded_toml_config<ChainSpec>(
143 self,
144 config: NodeConfig<ChainSpec>,
145 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
146 where
147 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
148 {
149 let toml_config = self.load_toml_config(&config)?;
150 Ok(self.with(WithConfigs { config, toml_config }))
151 }
152
153 pub fn load_toml_config<ChainSpec>(
158 &self,
159 config: &NodeConfig<ChainSpec>,
160 ) -> eyre::Result<reth_config::Config>
161 where
162 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
163 {
164 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
165
166 let mut toml_config = reth_config::Config::from_path(&config_path)
167 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
168
169 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
170
171 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
172
173 toml_config.peers.trusted_nodes_only |= config.network.trusted_only;
176
177 toml_config.static_files =
179 config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
180
181 Ok(toml_config)
182 }
183
184 fn save_pruning_config<ChainSpec>(
187 reth_config: &mut reth_config::Config,
188 config: &NodeConfig<ChainSpec>,
189 config_path: impl AsRef<std::path::Path>,
190 ) -> eyre::Result<()>
191 where
192 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
193 {
194 let mut should_save = reth_config.prune.segments.migrate();
195
196 if let Some(prune_config) = config.prune_config() {
197 if reth_config.prune != prune_config {
198 reth_config.set_prune_config(prune_config);
199 should_save = true;
200 }
201 } else if !reth_config.prune.is_default() {
202 info!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
203 }
204
205 if should_save {
206 info!(target: "reth::cli", "Saving prune config to toml file");
207 reth_config.save(config_path.as_ref())?;
208 }
209
210 Ok(())
211 }
212
213 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
215 self.configure_globals(reserved_cpu_cores);
216 self
217 }
218
219 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
224 match fdlimit::raise_fd_limit() {
227 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
228 debug!(from, to, "Raised file descriptor limit");
229 }
230 Ok(fdlimit::Outcome::Unsupported) => {}
231 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
232 }
233
234 let _ = reserved_cpu_cores;
238 let num_threads = available_parallelism().map_or(1, NonZeroUsize::get);
239 if let Err(err) = ThreadPoolBuilder::new()
240 .num_threads(num_threads)
241 .thread_name(|i| format!("rayon-{i:02}"))
242 .build_global()
243 {
244 warn!(%err, "Failed to build global thread pool")
245 }
246 }
247}
248
249#[derive(Debug, Clone)]
260pub struct LaunchContextWith<T> {
261 pub inner: LaunchContext,
263 pub attachment: T,
265}
266
267impl<T> LaunchContextWith<T> {
268 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
273 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
274 }
275
276 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
278 &self.inner.data_dir
279 }
280
281 pub const fn task_executor(&self) -> &TaskExecutor {
283 &self.inner.task_executor
284 }
285
286 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
288 LaunchContextWith {
289 inner: self.inner,
290 attachment: Attached::new(self.attachment, attachment),
291 }
292 }
293
294 pub fn inspect<F>(self, f: F) -> Self
297 where
298 F: FnOnce(&Self),
299 {
300 f(&self);
301 self
302 }
303}
304
305impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
306 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
308 if !self.attachment.config.network.trusted_peers.is_empty() {
309 info!(target: "reth::cli", "Adding trusted nodes");
310
311 self.attachment
312 .toml_config
313 .peers
314 .trusted_nodes
315 .extend(self.attachment.config.network.trusted_peers.clone());
316 }
317 Ok(self)
318 }
319}
320
321impl<L, R> LaunchContextWith<Attached<L, R>> {
322 pub const fn left(&self) -> &L {
324 &self.attachment.left
325 }
326
327 pub const fn right(&self) -> &R {
329 &self.attachment.right
330 }
331
332 pub const fn left_mut(&mut self) -> &mut L {
334 &mut self.attachment.left
335 }
336
337 pub const fn right_mut(&mut self) -> &mut R {
339 &mut self.attachment.right
340 }
341}
342impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
343 pub fn with_adjusted_configs(self) -> Self {
349 self.ensure_etl_datadir().with_adjusted_instance_ports()
350 }
351
352 pub fn ensure_etl_datadir(mut self) -> Self {
354 if self.toml_config_mut().stages.etl.dir.is_none() {
355 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
356 if etl_path.exists() {
357 if let Err(err) = fs::remove_dir_all(&etl_path) {
359 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
360 }
361 }
362 self.toml_config_mut().stages.etl.dir = Some(etl_path);
363 }
364
365 self
366 }
367
368 pub fn with_adjusted_instance_ports(mut self) -> Self {
370 self.node_config_mut().adjust_instance_ports();
371 self
372 }
373
374 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
376 self.attachment.left()
377 }
378
379 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
381 &self.left().config
382 }
383
384 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
386 &mut self.left_mut().config
387 }
388
389 pub const fn toml_config(&self) -> &reth_config::Config {
391 &self.left().toml_config
392 }
393
394 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
396 &mut self.left_mut().toml_config
397 }
398
399 pub fn chain_spec(&self) -> Arc<ChainSpec> {
401 self.node_config().chain.clone()
402 }
403
404 pub fn genesis_hash(&self) -> B256 {
406 self.node_config().chain.genesis_hash()
407 }
408
409 pub fn chain_id(&self) -> Chain {
411 self.node_config().chain.chain()
412 }
413
414 pub const fn is_dev(&self) -> bool {
416 self.node_config().dev.dev
417 }
418
419 pub fn prune_config(&self) -> PruneConfig
423 where
424 ChainSpec: reth_chainspec::EthereumHardforks,
425 {
426 let Some(mut node_prune_config) = self.node_config().prune_config() else {
427 return self.toml_config().prune.clone();
429 };
430
431 node_prune_config.merge(self.toml_config().prune.clone());
433 node_prune_config
434 }
435
436 pub fn prune_modes(&self) -> PruneModes
438 where
439 ChainSpec: reth_chainspec::EthereumHardforks,
440 {
441 self.prune_config().segments
442 }
443
444 pub fn pruner_builder(&self) -> PrunerBuilder
446 where
447 ChainSpec: reth_chainspec::EthereumHardforks,
448 {
449 PrunerBuilder::new(self.prune_config())
450 }
451
452 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
454 let default_jwt_path = self.data_dir().jwt();
455 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
456 Ok(secret)
457 }
458
459 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
461 where
462 Pool: TransactionPool + Unpin,
463 {
464 self.node_config().dev_mining_mode(pool)
465 }
466}
467
468impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
469where
470 DB: Database + Clone + 'static,
471 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
472{
473 pub async fn create_provider_factory<N, Evm>(
477 &self,
478 changeset_cache: ChangesetCache,
479 rocksdb_provider: Option<RocksDBProvider>,
480 disabled_stages: &[StageId],
481 ) -> eyre::Result<ProviderFactory<N>>
482 where
483 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
484 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
485 {
486 let static_files_config = &self.toml_config().static_files;
488 static_files_config.validate()?;
489
490 let static_file_provider =
492 StaticFileProviderBuilder::read_write(self.data_dir().static_files())
493 .with_metrics()
494 .with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
495 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
496 .build()?;
497
498 let rocksdb_provider = if let Some(provider) = rocksdb_provider {
500 provider
501 } else {
502 RocksDBProvider::builder(self.data_dir().rocksdb())
503 .with_default_tables()
504 .with_metrics()
505 .with_statistics()
506 .build()?
507 };
508
509 let prune_config = self.prune_config();
510 let balstore_cache_size = self
511 .node_config()
512 .db
513 .balstore_cache_size
514 .unwrap_or(BalConfig::DEFAULT_IN_MEMORY_RETENTION_DISTANCE);
515 let bal_store = BalStoreHandle::new(InMemoryBalStore::new(
516 BalConfig::with_in_memory_retention_distance(balstore_cache_size),
517 ));
518 let factory = ProviderFactory::new(
519 self.right().clone(),
520 self.chain_spec(),
521 static_file_provider,
522 rocksdb_provider,
523 self.task_executor().clone(),
524 )?
525 .with_prune_modes(prune_config.segments)
526 .with_minimum_pruning_distance(prune_config.minimum_pruning_distance)
527 .with_changeset_cache(changeset_cache)
528 .with_bal_store(bal_store);
529
530 let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
534
535 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
537
538 if let Some(unwind_block) = unwind_target {
539 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
542 (Some(_), Some(_)) => "RocksDB and static file",
543 (Some(_), None) => "RocksDB",
544 (None, Some(_)) => "static file",
545 (None, None) => unreachable!(),
546 };
547 assert_ne!(
548 unwind_block, 0,
549 "A {} inconsistency was found that would trigger an unwind to block 0",
550 inconsistency_source
551 );
552
553 let unwind_target = PipelineTarget::Unwind(unwind_block);
554
555 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
556
557 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
558
559 let pipeline = PipelineBuilder::default()
561 .add_stages(
562 DefaultStages::new(
563 factory.clone(),
564 tip_rx,
565 Arc::new(NoopConsensus::default()),
566 NoopHeaderDownloader::default(),
567 NoopBodiesDownloader::default(),
568 NoopEvmConfig::<Evm>::default(),
569 self.toml_config().stages.clone(),
570 self.prune_modes(),
571 None,
572 )
573 .builder()
574 .disable_all(disabled_stages),
575 )
576 .build(
577 factory.clone(),
578 StaticFileProducer::new(factory.clone(), self.prune_modes()),
579 );
580
581 let (tx, rx) = oneshot::channel();
583
584 self.task_executor().spawn_critical_blocking_task("pipeline task", async move {
586 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
587 let _ = tx.send(result);
588 });
589 rx.await?.inspect_err(|err| {
590 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
591 })?;
592 }
593
594 Ok(factory)
595 }
596
597 pub async fn with_provider_factory<N, Evm>(
599 self,
600 changeset_cache: ChangesetCache,
601 rocksdb_provider: Option<RocksDBProvider>,
602 disabled_stages: &[StageId],
603 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
604 where
605 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
606 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
607 {
608 let factory = self
609 .create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider, disabled_stages)
610 .await?;
611 let ctx = LaunchContextWith {
612 inner: self.inner,
613 attachment: self.attachment.map_right(|_| factory),
614 };
615
616 Ok(ctx)
617 }
618}
619
620impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
621where
622 T: ProviderNodeTypes,
623{
624 pub const fn database(&self) -> &T::DB {
626 self.right().db_ref()
627 }
628
629 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
631 self.right()
632 }
633
634 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
636 self.right().static_file_provider()
637 }
638
639 pub async fn with_prometheus_server(self) -> eyre::Result<Self>
643 where
644 T::ChainSpec: EthereumHardforks,
645 {
646 self.start_prometheus_endpoint().await?;
647 Ok(self)
648 }
649
650 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()>
652 where
653 T::ChainSpec: EthereumHardforks,
654 {
655 install_prometheus_recorder().spawn_upkeep();
657
658 let listen_addr = self.node_config().metrics.prometheus;
659 if let Some(addr) = listen_addr {
660 let prune_config = self.prune_config();
661 let pruning_mode =
662 PruneConfigKind::from_config(&prune_config, self.chain_spec().as_ref()).as_str();
663 let storage_settings =
667 if self.provider_factory().get_stage_checkpoint(StageId::Headers)?.is_some() {
668 self.provider_factory().cached_storage_settings()
669 } else {
670 self.node_config().storage_settings()
671 };
672 let config = MetricServerConfig::new(
673 addr,
674 VersionInfo {
675 version: version_metadata().cargo_pkg_version.as_ref(),
676 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
677 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
678 git_sha: version_metadata().vergen_git_sha.as_ref(),
679 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
680 build_profile: version_metadata().build_profile_name.as_ref(),
681 },
682 ChainSpecInfo { name: self.chain_id().to_string() },
683 self.task_executor().clone(),
684 metrics_hooks(self.provider_factory()),
685 self.data_dir().pprof_dumps(),
686 )
687 .with_storage_settings_info(StorageSettingsInfo {
688 storage_v2: storage_settings.storage_v2,
689 pruning_mode,
690 prune_config: serde_json::to_string(&prune_config)
691 .expect("serializing PruneConfig should not fail"),
692 })
693 .with_push_gateway(
694 self.node_config().metrics.push_gateway_url.clone(),
695 self.node_config().metrics.push_gateway_interval,
696 );
697
698 MetricServer::new(config).serve().await?;
699 }
700
701 Ok(())
702 }
703
704 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
706 init_genesis_with_settings_and_validate(
707 self.provider_factory(),
708 self.node_config().storage_settings(),
709 !self.node_config().debug.skip_genesis_validation,
710 )?;
711 Ok(self)
712 }
713
714 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
716 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())
717 }
718
719 pub fn with_metrics_task(
725 self,
726 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
727 let (metrics_sender, metrics_receiver) = unbounded_channel();
728
729 let with_metrics =
730 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
731
732 debug!(target: "reth::cli", "Spawning stages metrics listener task");
733 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
734 self.task_executor()
735 .spawn_critical_task("stages metrics listener task", sync_metrics_listener);
736
737 LaunchContextWith {
738 inner: self.inner,
739 attachment: self.attachment.map_right(|_| with_metrics),
740 }
741 }
742}
743
744impl<N, DB>
745 LaunchContextWith<
746 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
747 >
748where
749 N: NodeTypes,
750 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
751{
752 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
754 &self.right().provider_factory
755 }
756
757 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
759 self.right().metrics_sender.clone()
760 }
761
762 #[expect(clippy::complexity)]
764 pub fn with_blockchain_db<T, F>(
765 self,
766 create_blockchain_provider: F,
767 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
768 where
769 T: FullNodeTypes<Types = N, DB = DB>,
770 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
771 {
772 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
773
774 let metered_providers = WithMeteredProviders {
775 db_provider_container: WithMeteredProvider {
776 provider_factory: self.provider_factory().clone(),
777 metrics_sender: self.sync_metrics_tx(),
778 },
779 blockchain_db,
780 };
781
782 let ctx = LaunchContextWith {
783 inner: self.inner,
784 attachment: self.attachment.map_right(|_| metered_providers),
785 };
786
787 Ok(ctx)
788 }
789}
790
791impl<T>
792 LaunchContextWith<
793 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
794 >
795where
796 T: FullNodeTypes<Types: NodeTypesForProvider>,
797{
798 pub const fn database(&self) -> &T::DB {
800 self.provider_factory().db_ref()
801 }
802
803 pub const fn provider_factory(
805 &self,
806 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
807 &self.right().db_provider_container.provider_factory
808 }
809
810 pub fn lookup_head(&self) -> eyre::Result<Head> {
814 self.node_config()
815 .lookup_head(self.provider_factory())
816 .wrap_err("the head block is missing")
817 }
818
819 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
821 self.right().db_provider_container.metrics_sender.clone()
822 }
823
824 pub const fn blockchain_db(&self) -> &T::Provider {
826 &self.right().blockchain_db
827 }
828
829 pub async fn with_components<CB>(
831 self,
832 components_builder: CB,
833 on_component_initialized: Box<
834 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
835 >,
836 ) -> eyre::Result<
837 LaunchContextWith<
838 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
839 >,
840 >
841 where
842 CB: NodeComponentsBuilder<T>,
843 {
844 let head = self.lookup_head()?;
846
847 let builder_ctx = BuilderContext::new(
848 head,
849 self.blockchain_db().clone(),
850 self.task_executor().clone(),
851 self.configs().clone(),
852 );
853
854 debug!(target: "reth::cli", "creating components");
855 let components = components_builder.build_components(&builder_ctx).await?;
856
857 let blockchain_db = self.blockchain_db().clone();
858
859 let node_adapter = NodeAdapter {
860 components,
861 task_executor: self.task_executor().clone(),
862 provider: blockchain_db,
863 };
864
865 debug!(target: "reth::cli", "calling on_component_initialized hook");
866 on_component_initialized.on_event(node_adapter.clone())?;
867
868 let components_container = WithComponents {
869 db_provider_container: WithMeteredProvider {
870 provider_factory: self.provider_factory().clone(),
871 metrics_sender: self.sync_metrics_tx(),
872 },
873 node_adapter,
874 head,
875 };
876
877 let ctx = LaunchContextWith {
878 inner: self.inner,
879 attachment: self.attachment.map_right(|_| components_container),
880 };
881
882 Ok(ctx)
883 }
884}
885
886impl<T, CB>
887 LaunchContextWith<
888 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
889 >
890where
891 T: FullNodeTypes<Types: NodeTypesForProvider>,
892 CB: NodeComponentsBuilder<T>,
893{
894 pub const fn provider_factory(
896 &self,
897 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
898 &self.right().db_provider_container.provider_factory
899 }
900
901 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
904 where
905 C: HeadersClient<Header: BlockHeader>,
906 {
907 self.node_config().max_block(client, self.provider_factory().clone()).await
908 }
909
910 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
912 self.provider_factory().static_file_provider()
913 }
914
915 pub fn static_file_producer(
917 &self,
918 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
919 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
920 }
921
922 pub const fn head(&self) -> Head {
924 self.right().head
925 }
926
927 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
929 &self.right().node_adapter
930 }
931
932 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
934 &mut self.right_mut().node_adapter
935 }
936
937 pub const fn blockchain_db(&self) -> &T::Provider {
939 &self.node_adapter().provider
940 }
941
942 pub fn initial_backfill_target(
948 &self,
949 disabled_stages: &[StageId],
950 ) -> ProviderResult<Option<B256>> {
951 let mut initial_target = self.node_config().debug.tip;
952
953 if initial_target.is_none() {
954 initial_target = self.check_pipeline_consistency(disabled_stages)?;
955 }
956
957 Ok(initial_target)
958 }
959
960 pub const fn terminate_after_initial_backfill(&self) -> bool {
966 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
967 }
968
969 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
974 if self.chain_spec().is_optimism() &&
975 !self.is_dev() &&
976 self.chain_id() == Chain::optimism_mainnet()
977 {
978 let latest = self.blockchain_db().last_block_number()?;
979 if latest < 105235063 {
981 error!(
982 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
983 );
984 return Err(ProviderError::BestBlockNotFound);
985 }
986 }
987
988 Ok(())
989 }
990
991 pub fn check_pipeline_consistency(
1003 &self,
1004 disabled_stages: &[StageId],
1005 ) -> ProviderResult<Option<B256>> {
1006 let era_enabled = self.era_import_source().is_some();
1008 let mut all_stages = StageId::ALL
1009 .into_iter()
1010 .filter(|id| (era_enabled || id != &StageId::Era) && !disabled_stages.contains(id));
1011
1012 let first_stage = all_stages.next().expect("there must be at least one stage");
1014
1015 let first_stage_checkpoint = self
1018 .blockchain_db()
1019 .get_stage_checkpoint(first_stage)?
1020 .unwrap_or_default()
1021 .block_number;
1022
1023 for stage_id in all_stages {
1025 let stage_checkpoint = self
1026 .blockchain_db()
1027 .get_stage_checkpoint(stage_id)?
1028 .unwrap_or_default()
1029 .block_number;
1030
1031 debug!(
1034 target: "consensus::engine",
1035 first_stage_id = %first_stage,
1036 first_stage_checkpoint,
1037 stage_id = %stage_id,
1038 stage_checkpoint = stage_checkpoint,
1039 "Checking stage against first stage",
1040 );
1041 if stage_checkpoint < first_stage_checkpoint {
1042 debug!(
1043 target: "consensus::engine",
1044 first_stage_id = %first_stage,
1045 first_stage_checkpoint,
1046 inconsistent_stage_id = %stage_id,
1047 inconsistent_stage_checkpoint = stage_checkpoint,
1048 "Pipeline sync progress is inconsistent"
1049 );
1050 return self.blockchain_db().block_hash(first_stage_checkpoint);
1051 }
1052 }
1053
1054 self.ensure_chain_specific_db_checks()?;
1055
1056 Ok(None)
1057 }
1058
1059 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
1061 self.right().db_provider_container.metrics_sender.clone()
1062 }
1063
1064 pub const fn components(&self) -> &CB::Components {
1066 &self.node_adapter().components
1067 }
1068
1069 #[expect(clippy::type_complexity)]
1071 pub async fn launch_exex(
1072 &self,
1073 installed_exex: Vec<(
1074 String,
1075 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1076 )>,
1077 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1078 self.exex_launcher(installed_exex).launch().await
1079 }
1080
1081 #[expect(clippy::type_complexity)]
1093 pub fn exex_launcher(
1094 &self,
1095 installed_exex: Vec<(
1096 String,
1097 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1098 )>,
1099 ) -> ExExLauncher<NodeAdapter<T, CB::Components>> {
1100 ExExLauncher::new(
1101 self.head(),
1102 self.node_adapter().clone(),
1103 installed_exex,
1104 self.configs().clone(),
1105 )
1106 }
1107
1108 pub fn era_import_source(&self) -> Option<EraImportSource> {
1112 let node_config = self.node_config();
1113 if !node_config.era.enabled {
1114 return None;
1115 }
1116
1117 EraImportSource::maybe_new(
1118 node_config.era.source.path.clone(),
1119 node_config.era.source.url.clone(),
1120 || node_config.chain.chain().kind().default_era_host(),
1121 || node_config.datadir().data_dir().join("era").into(),
1122 )
1123 }
1124
1125 pub fn consensus_layer_events(
1133 &self,
1134 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1135 where
1136 T::Provider: reth_provider::CanonChainTracker,
1137 {
1138 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1139 Either::Left(
1140 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1141 .map(Into::into),
1142 )
1143 } else {
1144 Either::Right(stream::empty())
1145 }
1146 }
1147
1148 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1150 where
1151 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1152 + Send
1153 + Unpin
1154 + 'static,
1155 {
1156 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1157
1158 let network = self.components().network().clone();
1159 let pool = self.components().pool().clone();
1160 let provider = self.node_adapter().provider.clone();
1161
1162 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1163
1164 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1165
1166 let ethstats_for_events = ethstats.clone();
1168 let task_executor = self.task_executor().clone();
1169 task_executor.spawn_task(async move {
1170 while let Some(event) = engine_events.next().await {
1171 use reth_engine_primitives::ConsensusEngineEvent;
1172 match event {
1173 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1174 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1175 let block_hash = executed.recovered_block.num_hash().hash;
1176 let block_number = executed.recovered_block.num_hash().number;
1177 if let Err(e) = ethstats_for_events
1178 .report_new_payload(block_hash, block_number, duration)
1179 .await
1180 {
1181 debug!(
1182 target: "ethstats",
1183 "Failed to report new payload: {}", e
1184 );
1185 }
1186 }
1187 _ => {
1188 }
1190 }
1191 }
1192 });
1193
1194 task_executor.spawn_task(async move { ethstats.run().await });
1196
1197 Ok(())
1198 }
1199}
1200
1201#[derive(Clone, Copy, Debug)]
1207pub struct Attached<L, R> {
1208 left: L,
1209 right: R,
1210}
1211
1212impl<L, R> Attached<L, R> {
1213 pub const fn new(left: L, right: R) -> Self {
1215 Self { left, right }
1216 }
1217
1218 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1220 where
1221 F: FnOnce(L) -> T,
1222 {
1223 Attached::new(f(self.left), self.right)
1224 }
1225
1226 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1228 where
1229 F: FnOnce(R) -> T,
1230 {
1231 Attached::new(self.left, f(self.right))
1232 }
1233
1234 pub const fn left(&self) -> &L {
1236 &self.left
1237 }
1238
1239 pub const fn right(&self) -> &R {
1241 &self.right
1242 }
1243
1244 pub const fn left_mut(&mut self) -> &mut L {
1246 &mut self.left
1247 }
1248
1249 pub const fn right_mut(&mut self) -> &mut R {
1251 &mut self.right
1252 }
1253}
1254
1255#[derive(Debug)]
1258pub struct WithConfigs<ChainSpec> {
1259 pub config: NodeConfig<ChainSpec>,
1261 pub toml_config: reth_config::Config,
1263}
1264
1265impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1266 fn clone(&self) -> Self {
1267 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1268 }
1269}
1270
1271#[derive(Debug, Clone)]
1274pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1275 provider_factory: ProviderFactory<N>,
1276 metrics_sender: UnboundedSender<MetricEvent>,
1277}
1278
1279#[expect(missing_debug_implementations)]
1282pub struct WithMeteredProviders<T>
1283where
1284 T: FullNodeTypes,
1285{
1286 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1287 blockchain_db: T::Provider,
1288}
1289
1290#[expect(missing_debug_implementations)]
1292pub struct WithComponents<T, CB>
1293where
1294 T: FullNodeTypes,
1295 CB: NodeComponentsBuilder<T>,
1296{
1297 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1298 node_adapter: NodeAdapter<T, CB::Components>,
1299 head: Head,
1300}
1301
1302pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1304 Hooks::builder()
1305 .with_hook({
1306 let db = provider_factory.db_ref().clone();
1307 move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1308 })
1309 .with_hook({
1310 let sfp = provider_factory.static_file_provider();
1311 move || {
1312 throttle!(Duration::from_secs(5 * 60), || {
1313 if let Err(error) = sfp.report_metrics() {
1314 error!(%error, "Failed to report metrics from static file provider");
1315 }
1316 })
1317 }
1318 })
1319 .with_hook({
1320 let rocksdb = provider_factory.rocksdb_provider();
1321 move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1322 })
1323 .build()
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328 use super::{LaunchContext, NodeConfig};
1329 use reth_config::Config;
1330 use reth_node_core::args::PruningArgs;
1331
1332 const EXTENSION: &str = "toml";
1333
1334 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1335 let temp_dir = tempfile::tempdir().unwrap();
1336 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1337 proc(&config_path);
1338 temp_dir.close().unwrap()
1339 }
1340
1341 #[test]
1342 fn test_save_prune_config() {
1343 with_tempdir("prune-store-test", |config_path| {
1344 let mut reth_config = Config::default();
1345 let node_config = NodeConfig {
1346 pruning: PruningArgs {
1347 full: true,
1348 minimal: false,
1349 block_interval: None,
1350 sender_recovery_full: false,
1351 sender_recovery_distance: None,
1352 sender_recovery_before: None,
1353 transaction_lookup_full: false,
1354 transaction_lookup_distance: None,
1355 transaction_lookup_before: None,
1356 receipts_full: false,
1357 receipts_pre_merge: false,
1358 receipts_distance: None,
1359 receipts_before: None,
1360 account_history_full: false,
1361 account_history_distance: None,
1362 account_history_before: None,
1363 storage_history_full: false,
1364 storage_history_distance: None,
1365 storage_history_before: None,
1366 bodies_pre_merge: false,
1367 bodies_distance: None,
1368 receipts_log_filter: None,
1369 bodies_before: None,
1370 minimum_distance: None,
1371 },
1372 ..NodeConfig::test()
1373 };
1374 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1375 .unwrap();
1376
1377 let loaded_config = Config::from_path(config_path).unwrap();
1378
1379 assert_eq!(reth_config, loaded_config);
1380 })
1381 }
1382}