1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_consensus::BlockHeader as _;
38use alloy_eips::eip2124::Head;
39use alloy_primitives::{BlockNumber, B256};
40use eyre::Context;
41use rayon::ThreadPoolBuilder;
42use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
43use reth_config::{config::EtlConfig, PruneConfig};
44use reth_consensus::noop::NoopConsensus;
45use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
46use reth_db_common::init::{init_genesis, InitStorageError};
47use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
48use reth_engine_local::MiningMode;
49use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
50use reth_exex::ExExManagerHandle;
51use reth_fs_util as fs;
52use reth_network_p2p::headers::client::HeadersClient;
53use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
54use reth_node_core::{
55 args::DefaultEraHost,
56 dirs::{ChainPath, DataDirPath},
57 node_config::NodeConfig,
58 primitives::BlockHeader,
59 version::version_metadata,
60};
61use reth_node_metrics::{
62 chain::ChainSpecInfo,
63 hooks::Hooks,
64 recorder::install_prometheus_recorder,
65 server::{MetricServer, MetricServerConfig},
66 version::VersionInfo,
67};
68use reth_provider::{
69 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
70 BlockHashReader, BlockNumReader, BlockReaderIdExt, ChainSpecProvider, ProviderError,
71 ProviderFactory, ProviderResult, StageCheckpointReader, StateProviderFactory,
72 StaticFileProviderFactory,
73};
74use reth_prune::{PruneModes, PrunerBuilder};
75use reth_rpc_builder::config::RethRpcServerConfig;
76use reth_rpc_layer::JwtSecret;
77use reth_stages::{
78 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
79 StageId,
80};
81use reth_static_file::StaticFileProducer;
82use reth_tasks::TaskExecutor;
83use reth_tracing::tracing::{debug, error, info, warn};
84use reth_transaction_pool::TransactionPool;
85use std::{sync::Arc, thread::available_parallelism};
86use tokio::sync::{
87 mpsc::{unbounded_channel, UnboundedSender},
88 oneshot, watch,
89};
90
91use futures::{future::Either, stream, Stream, StreamExt};
92use reth_node_ethstats::EthStatsService;
93use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
94
95#[derive(Debug, Clone)]
114pub struct LaunchContext {
115 pub task_executor: TaskExecutor,
117 pub data_dir: ChainPath<DataDirPath>,
119}
120
121impl LaunchContext {
122 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
124 Self { task_executor, data_dir }
125 }
126
127 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
129 LaunchContextWith { inner: self, attachment }
130 }
131
132 pub fn with_loaded_toml_config<ChainSpec>(
137 self,
138 config: NodeConfig<ChainSpec>,
139 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
140 where
141 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
142 {
143 let toml_config = self.load_toml_config(&config)?;
144 Ok(self.with(WithConfigs { config, toml_config }))
145 }
146
147 pub fn load_toml_config<ChainSpec>(
152 &self,
153 config: &NodeConfig<ChainSpec>,
154 ) -> eyre::Result<reth_config::Config>
155 where
156 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
157 {
158 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
159
160 let mut toml_config = reth_config::Config::from_path(&config_path)
161 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
162
163 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
164
165 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
166
167 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
169
170 Ok(toml_config)
171 }
172
173 fn save_pruning_config_if_full_node<ChainSpec>(
175 reth_config: &mut reth_config::Config,
176 config: &NodeConfig<ChainSpec>,
177 config_path: impl AsRef<std::path::Path>,
178 ) -> eyre::Result<()>
179 where
180 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
181 {
182 if reth_config.prune.is_none() {
183 if let Some(prune_config) = config.prune_config() {
184 reth_config.update_prune_config(prune_config);
185 info!(target: "reth::cli", "Saving prune config to toml file");
186 reth_config.save(config_path.as_ref())?;
187 }
188 } else if config.prune_config().is_none() {
189 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
190 }
191 Ok(())
192 }
193
194 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
196 self.configure_globals(reserved_cpu_cores);
197 self
198 }
199
200 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
207 match fdlimit::raise_fd_limit() {
210 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
211 debug!(from, to, "Raised file descriptor limit");
212 }
213 Ok(fdlimit::Outcome::Unsupported) => {}
214 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
215 }
216
217 let num_threads = available_parallelism()
221 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
222 if let Err(err) = ThreadPoolBuilder::new()
223 .num_threads(num_threads)
224 .thread_name(|i| format!("reth-rayon-{i}"))
225 .build_global()
226 {
227 warn!(%err, "Failed to build global thread pool")
228 }
229 }
230}
231
232#[derive(Debug, Clone)]
243pub struct LaunchContextWith<T> {
244 pub inner: LaunchContext,
246 pub attachment: T,
248}
249
250impl<T> LaunchContextWith<T> {
251 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
256 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
257 }
258
259 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
261 &self.inner.data_dir
262 }
263
264 pub const fn task_executor(&self) -> &TaskExecutor {
266 &self.inner.task_executor
267 }
268
269 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
271 LaunchContextWith {
272 inner: self.inner,
273 attachment: Attached::new(self.attachment, attachment),
274 }
275 }
276
277 pub fn inspect<F>(self, f: F) -> Self
280 where
281 F: FnOnce(&Self),
282 {
283 f(&self);
284 self
285 }
286}
287
288impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
289 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
291 if !self.attachment.config.network.trusted_peers.is_empty() {
292 info!(target: "reth::cli", "Adding trusted nodes");
293
294 self.attachment
295 .toml_config
296 .peers
297 .trusted_nodes
298 .extend(self.attachment.config.network.trusted_peers.clone());
299 }
300 Ok(self)
301 }
302}
303
304impl<L, R> LaunchContextWith<Attached<L, R>> {
305 pub const fn left(&self) -> &L {
307 &self.attachment.left
308 }
309
310 pub const fn right(&self) -> &R {
312 &self.attachment.right
313 }
314
315 pub const fn left_mut(&mut self) -> &mut L {
317 &mut self.attachment.left
318 }
319
320 pub const fn right_mut(&mut self) -> &mut R {
322 &mut self.attachment.right
323 }
324}
325impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
326 pub fn with_adjusted_configs(self) -> Self {
332 self.ensure_etl_datadir().with_adjusted_instance_ports()
333 }
334
335 pub fn ensure_etl_datadir(mut self) -> Self {
337 if self.toml_config_mut().stages.etl.dir.is_none() {
338 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
339 if etl_path.exists() {
340 if let Err(err) = fs::remove_dir_all(&etl_path) {
342 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
343 }
344 }
345 self.toml_config_mut().stages.etl.dir = Some(etl_path);
346 }
347
348 self
349 }
350
351 pub fn with_adjusted_instance_ports(mut self) -> Self {
353 self.node_config_mut().adjust_instance_ports();
354 self
355 }
356
357 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
359 self.attachment.left()
360 }
361
362 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
364 &self.left().config
365 }
366
367 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
369 &mut self.left_mut().config
370 }
371
372 pub const fn toml_config(&self) -> &reth_config::Config {
374 &self.left().toml_config
375 }
376
377 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
379 &mut self.left_mut().toml_config
380 }
381
382 pub fn chain_spec(&self) -> Arc<ChainSpec> {
384 self.node_config().chain.clone()
385 }
386
387 pub fn genesis_hash(&self) -> B256 {
389 self.node_config().chain.genesis_hash()
390 }
391
392 pub fn chain_id(&self) -> Chain {
394 self.node_config().chain.chain()
395 }
396
397 pub const fn is_dev(&self) -> bool {
399 self.node_config().dev.dev
400 }
401
402 pub fn prune_config(&self) -> Option<PruneConfig>
406 where
407 ChainSpec: reth_chainspec::EthereumHardforks,
408 {
409 let Some(mut node_prune_config) = self.node_config().prune_config() else {
410 return self.toml_config().prune.clone();
412 };
413
414 node_prune_config.merge(self.toml_config().prune.clone());
416 Some(node_prune_config)
417 }
418
419 pub fn prune_modes(&self) -> PruneModes
421 where
422 ChainSpec: reth_chainspec::EthereumHardforks,
423 {
424 self.prune_config().map(|config| config.segments).unwrap_or_default()
425 }
426
427 pub fn pruner_builder(&self) -> PrunerBuilder
429 where
430 ChainSpec: reth_chainspec::EthereumHardforks,
431 {
432 PrunerBuilder::new(self.prune_config().unwrap_or_default())
433 .delete_limit(self.chain_spec().prune_delete_limit())
434 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
435 }
436
437 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
439 let default_jwt_path = self.data_dir().jwt();
440 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
441 Ok(secret)
442 }
443
444 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
446 where
447 Pool: TransactionPool + Unpin,
448 {
449 if let Some(interval) = self.node_config().dev.block_time {
450 MiningMode::interval(interval)
451 } else {
452 MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
453 }
454 }
455}
456
457impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
458where
459 DB: Database + Clone + 'static,
460 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
461{
462 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
466 where
467 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
468 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
469 {
470 let factory = ProviderFactory::new(
471 self.right().clone(),
472 self.chain_spec(),
473 StaticFileProvider::read_write(self.data_dir().static_files())?,
474 )
475 .with_prune_modes(self.prune_modes())
476 .with_static_files_metrics();
477
478 let has_receipt_pruning =
479 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
480
481 if let Some(unwind_target) = factory
484 .static_file_provider()
485 .check_consistency(&factory.provider()?, has_receipt_pruning)?
486 {
487 assert_ne!(
490 unwind_target,
491 PipelineTarget::Unwind(0),
492 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
493 );
494
495 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
496
497 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
498
499 let pipeline = PipelineBuilder::default()
501 .add_stages(DefaultStages::new(
502 factory.clone(),
503 tip_rx,
504 Arc::new(NoopConsensus::default()),
505 NoopHeaderDownloader::default(),
506 NoopBodiesDownloader::default(),
507 NoopEvmConfig::<Evm>::default(),
508 self.toml_config().stages.clone(),
509 self.prune_modes(),
510 None,
511 ))
512 .build(
513 factory.clone(),
514 StaticFileProducer::new(factory.clone(), self.prune_modes()),
515 );
516
517 let (tx, rx) = oneshot::channel();
519
520 self.task_executor().spawn_critical_blocking(
522 "pipeline task",
523 Box::pin(async move {
524 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
525 let _ = tx.send(result);
526 }),
527 );
528 rx.await?.inspect_err(|err| {
529 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
530 })?;
531 }
532
533 Ok(factory)
534 }
535
536 pub async fn with_provider_factory<N, Evm>(
538 self,
539 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
540 where
541 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
542 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
543 {
544 let factory = self.create_provider_factory::<N, Evm>().await?;
545 let ctx = LaunchContextWith {
546 inner: self.inner,
547 attachment: self.attachment.map_right(|_| factory),
548 };
549
550 Ok(ctx)
551 }
552}
553
554impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
555where
556 T: ProviderNodeTypes,
557{
558 pub const fn database(&self) -> &T::DB {
560 self.right().db_ref()
561 }
562
563 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
565 self.right()
566 }
567
568 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
570 self.right().static_file_provider()
571 }
572
573 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
577 self.start_prometheus_endpoint().await?;
578 Ok(self)
579 }
580
581 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
583 install_prometheus_recorder().spawn_upkeep();
585
586 let listen_addr = self.node_config().metrics;
587 if let Some(addr) = listen_addr {
588 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
589 let config = MetricServerConfig::new(
590 addr,
591 VersionInfo {
592 version: version_metadata().cargo_pkg_version.as_ref(),
593 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
594 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
595 git_sha: version_metadata().vergen_git_sha.as_ref(),
596 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
597 build_profile: version_metadata().build_profile_name.as_ref(),
598 },
599 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
600 self.task_executor().clone(),
601 Hooks::builder()
602 .with_hook({
603 let db = self.database().clone();
604 move || db.report_metrics()
605 })
606 .with_hook({
607 let sfp = self.static_file_provider();
608 move || {
609 if let Err(error) = sfp.report_metrics() {
610 error!(%error, "Failed to report metrics for the static file provider");
611 }
612 }
613 })
614 .build(),
615 );
616
617 MetricServer::new(config).serve().await?;
618 }
619
620 Ok(())
621 }
622
623 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
625 init_genesis(self.provider_factory())?;
626 Ok(self)
627 }
628
629 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
631 init_genesis(self.provider_factory())
632 }
633
634 pub fn with_metrics_task(
640 self,
641 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
642 let (metrics_sender, metrics_receiver) = unbounded_channel();
643
644 let with_metrics =
645 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
646
647 debug!(target: "reth::cli", "Spawning stages metrics listener task");
648 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
649 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
650
651 LaunchContextWith {
652 inner: self.inner,
653 attachment: self.attachment.map_right(|_| with_metrics),
654 }
655 }
656}
657
658impl<N, DB>
659 LaunchContextWith<
660 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
661 >
662where
663 N: NodeTypes,
664 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
665{
666 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
668 &self.right().provider_factory
669 }
670
671 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
673 self.right().metrics_sender.clone()
674 }
675
676 #[expect(clippy::complexity)]
678 pub fn with_blockchain_db<T, F>(
679 self,
680 create_blockchain_provider: F,
681 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
682 where
683 T: FullNodeTypes<Types = N, DB = DB>,
684 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
685 {
686 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
687
688 let metered_providers = WithMeteredProviders {
689 db_provider_container: WithMeteredProvider {
690 provider_factory: self.provider_factory().clone(),
691 metrics_sender: self.sync_metrics_tx(),
692 },
693 blockchain_db,
694 };
695
696 let ctx = LaunchContextWith {
697 inner: self.inner,
698 attachment: self.attachment.map_right(|_| metered_providers),
699 };
700
701 Ok(ctx)
702 }
703}
704
705impl<T>
706 LaunchContextWith<
707 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
708 >
709where
710 T: FullNodeTypes<Types: NodeTypesForProvider>,
711{
712 pub const fn database(&self) -> &T::DB {
714 self.provider_factory().db_ref()
715 }
716
717 pub const fn provider_factory(
719 &self,
720 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
721 &self.right().db_provider_container.provider_factory
722 }
723
724 pub fn lookup_head(&self) -> eyre::Result<Head> {
728 self.node_config()
729 .lookup_head(self.provider_factory())
730 .wrap_err("the head block is missing")
731 }
732
733 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
735 self.right().db_provider_container.metrics_sender.clone()
736 }
737
738 pub const fn blockchain_db(&self) -> &T::Provider {
740 &self.right().blockchain_db
741 }
742
743 pub async fn with_components<CB>(
745 self,
746 components_builder: CB,
747 on_component_initialized: Box<
748 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
749 >,
750 ) -> eyre::Result<
751 LaunchContextWith<
752 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
753 >,
754 >
755 where
756 CB: NodeComponentsBuilder<T>,
757 {
758 let head = self.lookup_head()?;
760
761 let builder_ctx = BuilderContext::new(
762 head,
763 self.blockchain_db().clone(),
764 self.task_executor().clone(),
765 self.configs().clone(),
766 );
767
768 debug!(target: "reth::cli", "creating components");
769 let components = components_builder.build_components(&builder_ctx).await?;
770
771 let blockchain_db = self.blockchain_db().clone();
772
773 let node_adapter = NodeAdapter {
774 components,
775 task_executor: self.task_executor().clone(),
776 provider: blockchain_db,
777 };
778
779 debug!(target: "reth::cli", "calling on_component_initialized hook");
780 on_component_initialized.on_event(node_adapter.clone())?;
781
782 let components_container = WithComponents {
783 db_provider_container: WithMeteredProvider {
784 provider_factory: self.provider_factory().clone(),
785 metrics_sender: self.sync_metrics_tx(),
786 },
787 node_adapter,
788 head,
789 };
790
791 let ctx = LaunchContextWith {
792 inner: self.inner,
793 attachment: self.attachment.map_right(|_| components_container),
794 };
795
796 Ok(ctx)
797 }
798}
799
800impl<T, CB>
801 LaunchContextWith<
802 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
803 >
804where
805 T: FullNodeTypes<Types: NodeTypesForProvider>,
806 CB: NodeComponentsBuilder<T>,
807{
808 pub const fn provider_factory(
810 &self,
811 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
812 &self.right().db_provider_container.provider_factory
813 }
814
815 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
818 where
819 C: HeadersClient<Header: BlockHeader>,
820 {
821 self.node_config().max_block(client, self.provider_factory().clone()).await
822 }
823
824 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
826 self.provider_factory().static_file_provider()
827 }
828
829 pub fn static_file_producer(
831 &self,
832 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
833 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
834 }
835
836 pub const fn head(&self) -> Head {
838 self.right().head
839 }
840
841 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
843 &self.right().node_adapter
844 }
845
846 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
848 &mut self.right_mut().node_adapter
849 }
850
851 pub const fn blockchain_db(&self) -> &T::Provider {
853 &self.node_adapter().provider
854 }
855
856 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
862 let mut initial_target = self.node_config().debug.tip;
863
864 if initial_target.is_none() {
865 initial_target = self.check_pipeline_consistency()?;
866 }
867
868 Ok(initial_target)
869 }
870
871 pub const fn terminate_after_initial_backfill(&self) -> bool {
877 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
878 }
879
880 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
885 if self.chain_spec().is_optimism() &&
886 !self.is_dev() &&
887 self.chain_id() == Chain::optimism_mainnet()
888 {
889 let latest = self.blockchain_db().last_block_number()?;
890 if latest < 105235063 {
892 error!(
893 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
894 );
895 return Err(ProviderError::BestBlockNotFound)
896 }
897 }
898
899 Ok(())
900 }
901
902 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
914 let first_stage_checkpoint = self
917 .blockchain_db()
918 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
919 .unwrap_or_default()
920 .block_number;
921
922 for stage_id in StageId::ALL.iter().skip(1) {
925 let stage_checkpoint = self
926 .blockchain_db()
927 .get_stage_checkpoint(*stage_id)?
928 .unwrap_or_default()
929 .block_number;
930
931 if stage_checkpoint < first_stage_checkpoint {
934 debug!(
935 target: "consensus::engine",
936 first_stage_checkpoint,
937 inconsistent_stage_id = %stage_id,
938 inconsistent_stage_checkpoint = stage_checkpoint,
939 "Pipeline sync progress is inconsistent"
940 );
941 return self.blockchain_db().block_hash(first_stage_checkpoint);
942 }
943 }
944
945 self.ensure_chain_specific_db_checks()?;
946
947 Ok(None)
948 }
949
950 pub fn expire_pre_merge_transactions(&self) -> eyre::Result<()>
956 where
957 T: FullNodeTypes<Provider: StaticFileProviderFactory>,
958 {
959 if self.node_config().pruning.bodies_pre_merge {
960 if let Some(merge_block) =
961 self.chain_spec().ethereum_fork_activation(EthereumHardfork::Paris).block_number()
962 {
963 let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
965 if latest.number() > merge_block {
966 let provider = self.blockchain_db().static_file_provider();
967 if provider.get_lowest_transaction_static_file_block() < Some(merge_block) {
968 info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
969 provider.delete_transactions_below(merge_block)?;
970 } else {
971 debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
972 }
973 }
974 }
975 }
976
977 Ok(())
978 }
979
980 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
982 self.right().db_provider_container.metrics_sender.clone()
983 }
984
985 pub const fn components(&self) -> &CB::Components {
987 &self.node_adapter().components
988 }
989
990 #[allow(clippy::type_complexity)]
992 pub async fn launch_exex(
993 &self,
994 installed_exex: Vec<(
995 String,
996 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
997 )>,
998 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
999 ExExLauncher::new(
1000 self.head(),
1001 self.node_adapter().clone(),
1002 installed_exex,
1003 self.configs().clone(),
1004 )
1005 .launch()
1006 .await
1007 }
1008
1009 pub fn era_import_source(&self) -> Option<EraImportSource> {
1013 let node_config = self.node_config();
1014 if !node_config.era.enabled {
1015 return None;
1016 }
1017
1018 EraImportSource::maybe_new(
1019 node_config.era.source.path.clone(),
1020 node_config.era.source.url.clone(),
1021 || node_config.chain.chain().kind().default_era_host(),
1022 || node_config.datadir().data_dir().join("era").into(),
1023 )
1024 }
1025
1026 pub fn consensus_layer_events(
1034 &self,
1035 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1036 where
1037 T::Provider: reth_provider::CanonChainTracker,
1038 {
1039 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1040 Either::Left(
1041 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1042 .map(Into::into),
1043 )
1044 } else {
1045 Either::Right(stream::empty())
1046 }
1047 }
1048
1049 pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1051 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1052
1053 let network = self.components().network().clone();
1054 let pool = self.components().pool().clone();
1055 let provider = self.node_adapter().provider.clone();
1056
1057 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1058
1059 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1060 tokio::spawn(async move { ethstats.run().await });
1061
1062 Ok(())
1063 }
1064}
1065
1066impl<T, CB>
1067 LaunchContextWith<
1068 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
1069 >
1070where
1071 T: FullNodeTypes<
1072 Provider: StateProviderFactory + ChainSpecProvider,
1073 Types: NodeTypesForProvider,
1074 >,
1075 CB: NodeComponentsBuilder<T>,
1076{
1077}
1078
1079#[derive(Clone, Copy, Debug)]
1085pub struct Attached<L, R> {
1086 left: L,
1087 right: R,
1088}
1089
1090impl<L, R> Attached<L, R> {
1091 pub const fn new(left: L, right: R) -> Self {
1093 Self { left, right }
1094 }
1095
1096 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1098 where
1099 F: FnOnce(L) -> T,
1100 {
1101 Attached::new(f(self.left), self.right)
1102 }
1103
1104 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1106 where
1107 F: FnOnce(R) -> T,
1108 {
1109 Attached::new(self.left, f(self.right))
1110 }
1111
1112 pub const fn left(&self) -> &L {
1114 &self.left
1115 }
1116
1117 pub const fn right(&self) -> &R {
1119 &self.right
1120 }
1121
1122 pub const fn left_mut(&mut self) -> &mut L {
1124 &mut self.left
1125 }
1126
1127 pub const fn right_mut(&mut self) -> &mut R {
1129 &mut self.right
1130 }
1131}
1132
1133#[derive(Debug)]
1136pub struct WithConfigs<ChainSpec> {
1137 pub config: NodeConfig<ChainSpec>,
1139 pub toml_config: reth_config::Config,
1141}
1142
1143impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1144 fn clone(&self) -> Self {
1145 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1146 }
1147}
1148
1149#[derive(Debug, Clone)]
1152pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1153 provider_factory: ProviderFactory<N>,
1154 metrics_sender: UnboundedSender<MetricEvent>,
1155}
1156
1157#[expect(missing_debug_implementations)]
1160pub struct WithMeteredProviders<T>
1161where
1162 T: FullNodeTypes,
1163{
1164 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1165 blockchain_db: T::Provider,
1166}
1167
1168#[expect(missing_debug_implementations)]
1170pub struct WithComponents<T, CB>
1171where
1172 T: FullNodeTypes,
1173 CB: NodeComponentsBuilder<T>,
1174{
1175 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1176 node_adapter: NodeAdapter<T, CB::Components>,
1177 head: Head,
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182 use super::{LaunchContext, NodeConfig};
1183 use reth_config::Config;
1184 use reth_node_core::args::PruningArgs;
1185
1186 const EXTENSION: &str = "toml";
1187
1188 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1189 let temp_dir = tempfile::tempdir().unwrap();
1190 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1191 proc(&config_path);
1192 temp_dir.close().unwrap()
1193 }
1194
1195 #[test]
1196 fn test_save_prune_config() {
1197 with_tempdir("prune-store-test", |config_path| {
1198 let mut reth_config = Config::default();
1199 let node_config = NodeConfig {
1200 pruning: PruningArgs {
1201 full: true,
1202 block_interval: None,
1203 sender_recovery_full: false,
1204 sender_recovery_distance: None,
1205 sender_recovery_before: None,
1206 transaction_lookup_full: false,
1207 transaction_lookup_distance: None,
1208 transaction_lookup_before: None,
1209 receipts_full: false,
1210 receipts_pre_merge: false,
1211 receipts_distance: None,
1212 receipts_before: None,
1213 account_history_full: false,
1214 account_history_distance: None,
1215 account_history_before: None,
1216 storage_history_full: false,
1217 storage_history_distance: None,
1218 storage_history_before: None,
1219 bodies_pre_merge: false,
1220 bodies_distance: None,
1221 receipts_log_filter: None,
1222 bodies_before: None,
1223 },
1224 ..NodeConfig::test()
1225 };
1226 LaunchContext::save_pruning_config_if_full_node(
1227 &mut reth_config,
1228 &node_config,
1229 config_path,
1230 )
1231 .unwrap();
1232
1233 let loaded_config = Config::from_path(config_path).unwrap();
1234
1235 assert_eq!(reth_config, loaded_config);
1236 })
1237 }
1238}