1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{
46 init_genesis_with_settings, init_genesis_with_settings_and_validate, InitStorageError,
47};
48use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
49use reth_engine_local::MiningMode;
50use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
51use reth_exex::ExExManagerHandle;
52use reth_fs_util as fs;
53use reth_network_p2p::headers::client::HeadersClient;
54use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
55use reth_node_core::{
56 args::{DefaultEraHost, PruneConfigKind},
57 dirs::{ChainPath, DataDirPath},
58 node_config::NodeConfig,
59 primitives::BlockHeader,
60 version::version_metadata,
61};
62use reth_node_metrics::{
63 chain::ChainSpecInfo,
64 hooks::Hooks,
65 recorder::install_prometheus_recorder,
66 server::{MetricServer, MetricServerConfig},
67 storage::StorageSettingsInfo,
68 version::VersionInfo,
69};
70use reth_provider::{
71 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
72 BalConfig, BalStoreHandle, BlockHashReader, BlockNumReader, InMemoryBalStore, ProviderError,
73 ProviderFactory, ProviderResult, RocksDBProviderFactory, StageCheckpointReader,
74 StaticFileProviderBuilder, StaticFileProviderFactory, StorageSettingsCache,
75};
76use reth_prune::{PruneModes, PrunerBuilder};
77use reth_rpc_builder::config::RethRpcServerConfig;
78use reth_rpc_layer::JwtSecret;
79use reth_stages::{
80 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
81 StageId,
82};
83use reth_static_file::StaticFileProducer;
84use reth_tasks::TaskExecutor;
85use reth_tracing::{
86 throttle,
87 tracing::{debug, error, info, warn},
88};
89use reth_transaction_pool::TransactionPool;
90use reth_trie_db::ChangesetCache;
91use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism, time::Duration};
92use tokio::sync::{
93 mpsc::{unbounded_channel, UnboundedSender},
94 oneshot, watch,
95};
96
97use futures::{future::Either, stream, Stream, StreamExt};
98use reth_node_ethstats::EthStatsService;
99use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
100
101#[derive(Debug, Clone)]
120pub struct LaunchContext {
121 pub task_executor: TaskExecutor,
123 pub data_dir: ChainPath<DataDirPath>,
125}
126
127impl LaunchContext {
128 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
130 Self { task_executor, data_dir }
131 }
132
133 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
135 LaunchContextWith { inner: self, attachment }
136 }
137
138 pub fn with_loaded_toml_config<ChainSpec>(
143 self,
144 config: NodeConfig<ChainSpec>,
145 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
146 where
147 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
148 {
149 let toml_config = self.load_toml_config(&config)?;
150 Ok(self.with(WithConfigs { config, toml_config }))
151 }
152
153 pub fn load_toml_config<ChainSpec>(
158 &self,
159 config: &NodeConfig<ChainSpec>,
160 ) -> eyre::Result<reth_config::Config>
161 where
162 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
163 {
164 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
165
166 let mut toml_config = reth_config::Config::from_path(&config_path)
167 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
168
169 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
170
171 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
172
173 toml_config.peers.trusted_nodes_only |= config.network.trusted_only;
176
177 toml_config.static_files =
179 config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
180
181 Ok(toml_config)
182 }
183
184 fn save_pruning_config<ChainSpec>(
187 reth_config: &mut reth_config::Config,
188 config: &NodeConfig<ChainSpec>,
189 config_path: impl AsRef<std::path::Path>,
190 ) -> eyre::Result<()>
191 where
192 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
193 {
194 let mut should_save = reth_config.prune.segments.migrate();
195
196 if let Some(prune_config) = config.prune_config() {
197 if reth_config.prune != prune_config {
198 reth_config.set_prune_config(prune_config);
199 should_save = true;
200 }
201 } else if !reth_config.prune.is_default() {
202 info!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
203 }
204
205 if should_save {
206 info!(target: "reth::cli", "Saving prune config to toml file");
207 reth_config.save(config_path.as_ref())?;
208 }
209
210 Ok(())
211 }
212
213 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
215 self.configure_globals(reserved_cpu_cores);
216 self
217 }
218
219 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
224 match fdlimit::raise_fd_limit() {
227 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
228 debug!(from, to, "Raised file descriptor limit");
229 }
230 Ok(fdlimit::Outcome::Unsupported) => {}
231 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
232 }
233
234 let _ = reserved_cpu_cores;
238 let num_threads = available_parallelism().map_or(1, NonZeroUsize::get);
239 if let Err(err) = ThreadPoolBuilder::new()
240 .num_threads(num_threads)
241 .thread_name(|i| format!("rayon-{i:02}"))
242 .build_global()
243 {
244 warn!(%err, "Failed to build global thread pool")
245 }
246 }
247}
248
249#[derive(Debug, Clone)]
260pub struct LaunchContextWith<T> {
261 pub inner: LaunchContext,
263 pub attachment: T,
265}
266
267impl<T> LaunchContextWith<T> {
268 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
273 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
274 }
275
276 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
278 &self.inner.data_dir
279 }
280
281 pub const fn task_executor(&self) -> &TaskExecutor {
283 &self.inner.task_executor
284 }
285
286 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
288 LaunchContextWith {
289 inner: self.inner,
290 attachment: Attached::new(self.attachment, attachment),
291 }
292 }
293
294 pub fn inspect<F>(self, f: F) -> Self
297 where
298 F: FnOnce(&Self),
299 {
300 f(&self);
301 self
302 }
303}
304
305impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
306 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
308 if !self.attachment.config.network.trusted_peers.is_empty() {
309 info!(target: "reth::cli", "Adding trusted nodes");
310
311 self.attachment
312 .toml_config
313 .peers
314 .trusted_nodes
315 .extend(self.attachment.config.network.trusted_peers.clone());
316 }
317 Ok(self)
318 }
319}
320
321impl<L, R> LaunchContextWith<Attached<L, R>> {
322 pub const fn left(&self) -> &L {
324 &self.attachment.left
325 }
326
327 pub const fn right(&self) -> &R {
329 &self.attachment.right
330 }
331
332 pub const fn left_mut(&mut self) -> &mut L {
334 &mut self.attachment.left
335 }
336
337 pub const fn right_mut(&mut self) -> &mut R {
339 &mut self.attachment.right
340 }
341}
342impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
343 pub fn with_adjusted_configs(self) -> Self {
349 self.ensure_etl_datadir().with_adjusted_instance_ports()
350 }
351
352 pub fn ensure_etl_datadir(mut self) -> Self {
354 if self.toml_config_mut().stages.etl.dir.is_none() {
355 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
356 if etl_path.exists() {
357 if let Err(err) = fs::remove_dir_all(&etl_path) {
359 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
360 }
361 }
362 self.toml_config_mut().stages.etl.dir = Some(etl_path);
363 }
364
365 self
366 }
367
368 pub fn with_adjusted_instance_ports(mut self) -> Self {
370 self.node_config_mut().adjust_instance_ports();
371 self
372 }
373
374 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
376 self.attachment.left()
377 }
378
379 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
381 &self.left().config
382 }
383
384 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
386 &mut self.left_mut().config
387 }
388
389 pub const fn toml_config(&self) -> &reth_config::Config {
391 &self.left().toml_config
392 }
393
394 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
396 &mut self.left_mut().toml_config
397 }
398
399 pub fn chain_spec(&self) -> Arc<ChainSpec> {
401 self.node_config().chain.clone()
402 }
403
404 pub fn genesis_hash(&self) -> B256 {
406 self.node_config().chain.genesis_hash()
407 }
408
409 pub fn chain_id(&self) -> Chain {
411 self.node_config().chain.chain()
412 }
413
414 pub const fn is_dev(&self) -> bool {
416 self.node_config().dev.dev
417 }
418
419 pub fn prune_config(&self) -> PruneConfig
423 where
424 ChainSpec: reth_chainspec::EthereumHardforks,
425 {
426 let Some(mut node_prune_config) = self.node_config().prune_config() else {
427 return self.toml_config().prune.clone();
429 };
430
431 node_prune_config.merge(self.toml_config().prune.clone());
433 node_prune_config
434 }
435
436 pub fn prune_modes(&self) -> PruneModes
438 where
439 ChainSpec: reth_chainspec::EthereumHardforks,
440 {
441 self.prune_config().segments
442 }
443
444 pub fn pruner_builder(&self) -> PrunerBuilder
446 where
447 ChainSpec: reth_chainspec::EthereumHardforks,
448 {
449 PrunerBuilder::new(self.prune_config())
450 }
451
452 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
454 let default_jwt_path = self.data_dir().jwt();
455 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
456 Ok(secret)
457 }
458
459 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
461 where
462 Pool: TransactionPool + Unpin,
463 {
464 self.node_config().dev_mining_mode(pool)
465 }
466}
467
468impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
469where
470 DB: Database + Clone + 'static,
471 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
472{
473 pub async fn create_provider_factory<N, Evm>(
477 &self,
478 changeset_cache: ChangesetCache,
479 rocksdb_provider: Option<RocksDBProvider>,
480 ) -> eyre::Result<ProviderFactory<N>>
481 where
482 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
483 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
484 {
485 let static_files_config = &self.toml_config().static_files;
487 static_files_config.validate()?;
488
489 let static_file_provider =
491 StaticFileProviderBuilder::read_write(self.data_dir().static_files())
492 .with_metrics()
493 .with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
494 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
495 .build()?;
496
497 let rocksdb_provider = if let Some(provider) = rocksdb_provider {
499 provider
500 } else {
501 RocksDBProvider::builder(self.data_dir().rocksdb())
502 .with_default_tables()
503 .with_metrics()
504 .with_statistics()
505 .build()?
506 };
507
508 let prune_config = self.prune_config();
509 let balstore_cache_size = self
510 .node_config()
511 .db
512 .balstore_cache_size
513 .unwrap_or(BalConfig::DEFAULT_IN_MEMORY_RETENTION_DISTANCE);
514 let bal_store = BalStoreHandle::new(InMemoryBalStore::new(
515 BalConfig::with_in_memory_retention_distance(balstore_cache_size),
516 ));
517 let factory = ProviderFactory::new(
518 self.right().clone(),
519 self.chain_spec(),
520 static_file_provider,
521 rocksdb_provider,
522 self.task_executor().clone(),
523 )?
524 .with_prune_modes(prune_config.segments)
525 .with_minimum_pruning_distance(prune_config.minimum_pruning_distance)
526 .with_changeset_cache(changeset_cache)
527 .with_bal_store(bal_store);
528
529 let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
533
534 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
536
537 if let Some(unwind_block) = unwind_target {
538 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
541 (Some(_), Some(_)) => "RocksDB and static file",
542 (Some(_), None) => "RocksDB",
543 (None, Some(_)) => "static file",
544 (None, None) => unreachable!(),
545 };
546 assert_ne!(
547 unwind_block, 0,
548 "A {} inconsistency was found that would trigger an unwind to block 0",
549 inconsistency_source
550 );
551
552 let unwind_target = PipelineTarget::Unwind(unwind_block);
553
554 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
555
556 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
557
558 let pipeline = PipelineBuilder::default()
560 .add_stages(DefaultStages::new(
561 factory.clone(),
562 tip_rx,
563 Arc::new(NoopConsensus::default()),
564 NoopHeaderDownloader::default(),
565 NoopBodiesDownloader::default(),
566 NoopEvmConfig::<Evm>::default(),
567 self.toml_config().stages.clone(),
568 self.prune_modes(),
569 None,
570 ))
571 .build(
572 factory.clone(),
573 StaticFileProducer::new(factory.clone(), self.prune_modes()),
574 );
575
576 let (tx, rx) = oneshot::channel();
578
579 self.task_executor().spawn_critical_blocking_task("pipeline task", async move {
581 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
582 let _ = tx.send(result);
583 });
584 rx.await?.inspect_err(|err| {
585 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
586 })?;
587 }
588
589 Ok(factory)
590 }
591
592 pub async fn with_provider_factory<N, Evm>(
594 self,
595 changeset_cache: ChangesetCache,
596 rocksdb_provider: Option<RocksDBProvider>,
597 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
598 where
599 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
600 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
601 {
602 let factory =
603 self.create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider).await?;
604 let ctx = LaunchContextWith {
605 inner: self.inner,
606 attachment: self.attachment.map_right(|_| factory),
607 };
608
609 Ok(ctx)
610 }
611}
612
613impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
614where
615 T: ProviderNodeTypes,
616{
617 pub const fn database(&self) -> &T::DB {
619 self.right().db_ref()
620 }
621
622 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
624 self.right()
625 }
626
627 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
629 self.right().static_file_provider()
630 }
631
632 pub async fn with_prometheus_server(self) -> eyre::Result<Self>
636 where
637 T::ChainSpec: EthereumHardforks,
638 {
639 self.start_prometheus_endpoint().await?;
640 Ok(self)
641 }
642
643 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()>
645 where
646 T::ChainSpec: EthereumHardforks,
647 {
648 install_prometheus_recorder().spawn_upkeep();
650
651 let listen_addr = self.node_config().metrics.prometheus;
652 if let Some(addr) = listen_addr {
653 let prune_config = self.prune_config();
654 let pruning_mode =
655 PruneConfigKind::from_config(&prune_config, self.chain_spec().as_ref()).as_str();
656 let storage_settings =
660 if self.provider_factory().get_stage_checkpoint(StageId::Headers)?.is_some() {
661 self.provider_factory().cached_storage_settings()
662 } else {
663 self.node_config().storage_settings()
664 };
665 let config = MetricServerConfig::new(
666 addr,
667 VersionInfo {
668 version: version_metadata().cargo_pkg_version.as_ref(),
669 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
670 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
671 git_sha: version_metadata().vergen_git_sha.as_ref(),
672 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
673 build_profile: version_metadata().build_profile_name.as_ref(),
674 },
675 ChainSpecInfo { name: self.chain_id().to_string() },
676 self.task_executor().clone(),
677 metrics_hooks(self.provider_factory()),
678 self.data_dir().pprof_dumps(),
679 )
680 .with_storage_settings_info(StorageSettingsInfo {
681 storage_v2: storage_settings.storage_v2,
682 pruning_mode,
683 prune_config: serde_json::to_string(&prune_config)
684 .expect("serializing PruneConfig should not fail"),
685 })
686 .with_push_gateway(
687 self.node_config().metrics.push_gateway_url.clone(),
688 self.node_config().metrics.push_gateway_interval,
689 );
690
691 MetricServer::new(config).serve().await?;
692 }
693
694 Ok(())
695 }
696
697 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
699 init_genesis_with_settings_and_validate(
700 self.provider_factory(),
701 self.node_config().storage_settings(),
702 !self.node_config().debug.skip_genesis_validation,
703 )?;
704 Ok(self)
705 }
706
707 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
709 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())
710 }
711
712 pub fn with_metrics_task(
718 self,
719 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
720 let (metrics_sender, metrics_receiver) = unbounded_channel();
721
722 let with_metrics =
723 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
724
725 debug!(target: "reth::cli", "Spawning stages metrics listener task");
726 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
727 self.task_executor()
728 .spawn_critical_task("stages metrics listener task", sync_metrics_listener);
729
730 LaunchContextWith {
731 inner: self.inner,
732 attachment: self.attachment.map_right(|_| with_metrics),
733 }
734 }
735}
736
737impl<N, DB>
738 LaunchContextWith<
739 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
740 >
741where
742 N: NodeTypes,
743 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
744{
745 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
747 &self.right().provider_factory
748 }
749
750 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
752 self.right().metrics_sender.clone()
753 }
754
755 #[expect(clippy::complexity)]
757 pub fn with_blockchain_db<T, F>(
758 self,
759 create_blockchain_provider: F,
760 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
761 where
762 T: FullNodeTypes<Types = N, DB = DB>,
763 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
764 {
765 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
766
767 let metered_providers = WithMeteredProviders {
768 db_provider_container: WithMeteredProvider {
769 provider_factory: self.provider_factory().clone(),
770 metrics_sender: self.sync_metrics_tx(),
771 },
772 blockchain_db,
773 };
774
775 let ctx = LaunchContextWith {
776 inner: self.inner,
777 attachment: self.attachment.map_right(|_| metered_providers),
778 };
779
780 Ok(ctx)
781 }
782}
783
784impl<T>
785 LaunchContextWith<
786 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
787 >
788where
789 T: FullNodeTypes<Types: NodeTypesForProvider>,
790{
791 pub const fn database(&self) -> &T::DB {
793 self.provider_factory().db_ref()
794 }
795
796 pub const fn provider_factory(
798 &self,
799 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
800 &self.right().db_provider_container.provider_factory
801 }
802
803 pub fn lookup_head(&self) -> eyre::Result<Head> {
807 self.node_config()
808 .lookup_head(self.provider_factory())
809 .wrap_err("the head block is missing")
810 }
811
812 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
814 self.right().db_provider_container.metrics_sender.clone()
815 }
816
817 pub const fn blockchain_db(&self) -> &T::Provider {
819 &self.right().blockchain_db
820 }
821
822 pub async fn with_components<CB>(
824 self,
825 components_builder: CB,
826 on_component_initialized: Box<
827 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
828 >,
829 ) -> eyre::Result<
830 LaunchContextWith<
831 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
832 >,
833 >
834 where
835 CB: NodeComponentsBuilder<T>,
836 {
837 let head = self.lookup_head()?;
839
840 let builder_ctx = BuilderContext::new(
841 head,
842 self.blockchain_db().clone(),
843 self.task_executor().clone(),
844 self.configs().clone(),
845 );
846
847 debug!(target: "reth::cli", "creating components");
848 let components = components_builder.build_components(&builder_ctx).await?;
849
850 let blockchain_db = self.blockchain_db().clone();
851
852 let node_adapter = NodeAdapter {
853 components,
854 task_executor: self.task_executor().clone(),
855 provider: blockchain_db,
856 };
857
858 debug!(target: "reth::cli", "calling on_component_initialized hook");
859 on_component_initialized.on_event(node_adapter.clone())?;
860
861 let components_container = WithComponents {
862 db_provider_container: WithMeteredProvider {
863 provider_factory: self.provider_factory().clone(),
864 metrics_sender: self.sync_metrics_tx(),
865 },
866 node_adapter,
867 head,
868 };
869
870 let ctx = LaunchContextWith {
871 inner: self.inner,
872 attachment: self.attachment.map_right(|_| components_container),
873 };
874
875 Ok(ctx)
876 }
877}
878
879impl<T, CB>
880 LaunchContextWith<
881 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
882 >
883where
884 T: FullNodeTypes<Types: NodeTypesForProvider>,
885 CB: NodeComponentsBuilder<T>,
886{
887 pub const fn provider_factory(
889 &self,
890 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
891 &self.right().db_provider_container.provider_factory
892 }
893
894 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
897 where
898 C: HeadersClient<Header: BlockHeader>,
899 {
900 self.node_config().max_block(client, self.provider_factory().clone()).await
901 }
902
903 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
905 self.provider_factory().static_file_provider()
906 }
907
908 pub fn static_file_producer(
910 &self,
911 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
912 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
913 }
914
915 pub const fn head(&self) -> Head {
917 self.right().head
918 }
919
920 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
922 &self.right().node_adapter
923 }
924
925 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
927 &mut self.right_mut().node_adapter
928 }
929
930 pub const fn blockchain_db(&self) -> &T::Provider {
932 &self.node_adapter().provider
933 }
934
935 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
941 let mut initial_target = self.node_config().debug.tip;
942
943 if initial_target.is_none() {
944 initial_target = self.check_pipeline_consistency()?;
945 }
946
947 Ok(initial_target)
948 }
949
950 pub const fn terminate_after_initial_backfill(&self) -> bool {
956 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
957 }
958
959 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
964 if self.chain_spec().is_optimism() &&
965 !self.is_dev() &&
966 self.chain_id() == Chain::optimism_mainnet()
967 {
968 let latest = self.blockchain_db().last_block_number()?;
969 if latest < 105235063 {
971 error!(
972 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
973 );
974 return Err(ProviderError::BestBlockNotFound);
975 }
976 }
977
978 Ok(())
979 }
980
981 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
993 let era_enabled = self.era_import_source().is_some();
995 let mut all_stages =
996 StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
997
998 let first_stage = all_stages.next().expect("there must be at least one stage");
1000
1001 let first_stage_checkpoint = self
1004 .blockchain_db()
1005 .get_stage_checkpoint(first_stage)?
1006 .unwrap_or_default()
1007 .block_number;
1008
1009 for stage_id in all_stages {
1011 let stage_checkpoint = self
1012 .blockchain_db()
1013 .get_stage_checkpoint(stage_id)?
1014 .unwrap_or_default()
1015 .block_number;
1016
1017 debug!(
1020 target: "consensus::engine",
1021 first_stage_id = %first_stage,
1022 first_stage_checkpoint,
1023 stage_id = %stage_id,
1024 stage_checkpoint = stage_checkpoint,
1025 "Checking stage against first stage",
1026 );
1027 if stage_checkpoint < first_stage_checkpoint {
1028 debug!(
1029 target: "consensus::engine",
1030 first_stage_id = %first_stage,
1031 first_stage_checkpoint,
1032 inconsistent_stage_id = %stage_id,
1033 inconsistent_stage_checkpoint = stage_checkpoint,
1034 "Pipeline sync progress is inconsistent"
1035 );
1036 return self.blockchain_db().block_hash(first_stage_checkpoint);
1037 }
1038 }
1039
1040 self.ensure_chain_specific_db_checks()?;
1041
1042 Ok(None)
1043 }
1044
1045 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
1047 self.right().db_provider_container.metrics_sender.clone()
1048 }
1049
1050 pub const fn components(&self) -> &CB::Components {
1052 &self.node_adapter().components
1053 }
1054
1055 #[expect(clippy::type_complexity)]
1057 pub async fn launch_exex(
1058 &self,
1059 installed_exex: Vec<(
1060 String,
1061 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1062 )>,
1063 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1064 self.exex_launcher(installed_exex).launch().await
1065 }
1066
1067 #[expect(clippy::type_complexity)]
1079 pub fn exex_launcher(
1080 &self,
1081 installed_exex: Vec<(
1082 String,
1083 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1084 )>,
1085 ) -> ExExLauncher<NodeAdapter<T, CB::Components>> {
1086 ExExLauncher::new(
1087 self.head(),
1088 self.node_adapter().clone(),
1089 installed_exex,
1090 self.configs().clone(),
1091 )
1092 }
1093
1094 pub fn era_import_source(&self) -> Option<EraImportSource> {
1098 let node_config = self.node_config();
1099 if !node_config.era.enabled {
1100 return None;
1101 }
1102
1103 EraImportSource::maybe_new(
1104 node_config.era.source.path.clone(),
1105 node_config.era.source.url.clone(),
1106 || node_config.chain.chain().kind().default_era_host(),
1107 || node_config.datadir().data_dir().join("era").into(),
1108 )
1109 }
1110
1111 pub fn consensus_layer_events(
1119 &self,
1120 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1121 where
1122 T::Provider: reth_provider::CanonChainTracker,
1123 {
1124 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1125 Either::Left(
1126 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1127 .map(Into::into),
1128 )
1129 } else {
1130 Either::Right(stream::empty())
1131 }
1132 }
1133
1134 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1136 where
1137 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1138 + Send
1139 + Unpin
1140 + 'static,
1141 {
1142 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1143
1144 let network = self.components().network().clone();
1145 let pool = self.components().pool().clone();
1146 let provider = self.node_adapter().provider.clone();
1147
1148 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1149
1150 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1151
1152 let ethstats_for_events = ethstats.clone();
1154 let task_executor = self.task_executor().clone();
1155 task_executor.spawn_task(async move {
1156 while let Some(event) = engine_events.next().await {
1157 use reth_engine_primitives::ConsensusEngineEvent;
1158 match event {
1159 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1160 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1161 let block_hash = executed.recovered_block.num_hash().hash;
1162 let block_number = executed.recovered_block.num_hash().number;
1163 if let Err(e) = ethstats_for_events
1164 .report_new_payload(block_hash, block_number, duration)
1165 .await
1166 {
1167 debug!(
1168 target: "ethstats",
1169 "Failed to report new payload: {}", e
1170 );
1171 }
1172 }
1173 _ => {
1174 }
1176 }
1177 }
1178 });
1179
1180 task_executor.spawn_task(async move { ethstats.run().await });
1182
1183 Ok(())
1184 }
1185}
1186
1187#[derive(Clone, Copy, Debug)]
1193pub struct Attached<L, R> {
1194 left: L,
1195 right: R,
1196}
1197
1198impl<L, R> Attached<L, R> {
1199 pub const fn new(left: L, right: R) -> Self {
1201 Self { left, right }
1202 }
1203
1204 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1206 where
1207 F: FnOnce(L) -> T,
1208 {
1209 Attached::new(f(self.left), self.right)
1210 }
1211
1212 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1214 where
1215 F: FnOnce(R) -> T,
1216 {
1217 Attached::new(self.left, f(self.right))
1218 }
1219
1220 pub const fn left(&self) -> &L {
1222 &self.left
1223 }
1224
1225 pub const fn right(&self) -> &R {
1227 &self.right
1228 }
1229
1230 pub const fn left_mut(&mut self) -> &mut L {
1232 &mut self.left
1233 }
1234
1235 pub const fn right_mut(&mut self) -> &mut R {
1237 &mut self.right
1238 }
1239}
1240
1241#[derive(Debug)]
1244pub struct WithConfigs<ChainSpec> {
1245 pub config: NodeConfig<ChainSpec>,
1247 pub toml_config: reth_config::Config,
1249}
1250
1251impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1252 fn clone(&self) -> Self {
1253 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1254 }
1255}
1256
1257#[derive(Debug, Clone)]
1260pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1261 provider_factory: ProviderFactory<N>,
1262 metrics_sender: UnboundedSender<MetricEvent>,
1263}
1264
1265#[expect(missing_debug_implementations)]
1268pub struct WithMeteredProviders<T>
1269where
1270 T: FullNodeTypes,
1271{
1272 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1273 blockchain_db: T::Provider,
1274}
1275
1276#[expect(missing_debug_implementations)]
1278pub struct WithComponents<T, CB>
1279where
1280 T: FullNodeTypes,
1281 CB: NodeComponentsBuilder<T>,
1282{
1283 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1284 node_adapter: NodeAdapter<T, CB::Components>,
1285 head: Head,
1286}
1287
1288pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1290 Hooks::builder()
1291 .with_hook({
1292 let db = provider_factory.db_ref().clone();
1293 move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1294 })
1295 .with_hook({
1296 let sfp = provider_factory.static_file_provider();
1297 move || {
1298 throttle!(Duration::from_secs(5 * 60), || {
1299 if let Err(error) = sfp.report_metrics() {
1300 error!(%error, "Failed to report metrics from static file provider");
1301 }
1302 })
1303 }
1304 })
1305 .with_hook({
1306 let rocksdb = provider_factory.rocksdb_provider();
1307 move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1308 })
1309 .build()
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314 use super::{LaunchContext, NodeConfig};
1315 use reth_config::Config;
1316 use reth_node_core::args::PruningArgs;
1317
1318 const EXTENSION: &str = "toml";
1319
1320 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1321 let temp_dir = tempfile::tempdir().unwrap();
1322 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1323 proc(&config_path);
1324 temp_dir.close().unwrap()
1325 }
1326
1327 #[test]
1328 fn test_save_prune_config() {
1329 with_tempdir("prune-store-test", |config_path| {
1330 let mut reth_config = Config::default();
1331 let node_config = NodeConfig {
1332 pruning: PruningArgs {
1333 full: true,
1334 minimal: false,
1335 block_interval: None,
1336 sender_recovery_full: false,
1337 sender_recovery_distance: None,
1338 sender_recovery_before: None,
1339 transaction_lookup_full: false,
1340 transaction_lookup_distance: None,
1341 transaction_lookup_before: None,
1342 receipts_full: false,
1343 receipts_pre_merge: false,
1344 receipts_distance: None,
1345 receipts_before: None,
1346 account_history_full: false,
1347 account_history_distance: None,
1348 account_history_before: None,
1349 storage_history_full: false,
1350 storage_history_distance: None,
1351 storage_history_before: None,
1352 bodies_pre_merge: false,
1353 bodies_distance: None,
1354 receipts_log_filter: None,
1355 bodies_before: None,
1356 minimum_distance: None,
1357 },
1358 ..NodeConfig::test()
1359 };
1360 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1361 .unwrap();
1362
1363 let loaded_config = Config::from_path(config_path).unwrap();
1364
1365 assert_eq!(reth_config, loaded_config);
1366 })
1367 }
1368}