reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2//!
3//! ## Launch Context Type System
4//!
5//! The node launch process uses a type-state pattern to ensure correct initialization
6//! order at compile time. Methods are only available when their prerequisites are met.
7//!
8//! ### Core Types
9//!
10//! - [`LaunchContext`]: Base context with executor and data directory
11//! - [`LaunchContextWith<T>`]: Context with an attached value of type `T`
12//! - [`Attached<L, R>`]: Pairs values, preserving both previous (L) and new (R) state
13//!
14//! ### Helper Attachments
15//!
16//! - [`WithConfigs`]: Node config + TOML config
17//! - [`WithMeteredProvider`]: Provider factory with metrics
18//! - [`WithMeteredProviders`]: Provider factory + blockchain provider
19//! - [`WithComponents`]: Final form with all components
20//!
21//! ### Method Availability
22//!
23//! Methods are implemented on specific type combinations:
24//! - `impl<T> LaunchContextWith<T>`: Generic methods available for any attachment
25//! - `impl LaunchContextWith<WithConfigs>`: Config-specific methods
26//! - `impl LaunchContextWith<Attached<WithConfigs, DB>>`: Database operations
27//! - `impl LaunchContextWith<Attached<WithConfigs, ProviderFactory>>`: Provider operations
28//! - etc.
29//!
30//! This ensures correct initialization order without runtime checks.
31
32use crate::{
33    components::{NodeComponents, NodeComponentsBuilder},
34    hooks::OnComponentInitializedHook,
35    BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54    args::DefaultEraHost,
55    dirs::{ChainPath, DataDirPath},
56    node_config::NodeConfig,
57    primitives::BlockHeader,
58    version::version_metadata,
59};
60use reth_node_metrics::{
61    chain::ChainSpecInfo,
62    hooks::Hooks,
63    recorder::install_prometheus_recorder,
64    server::{MetricServer, MetricServerConfig},
65    version::VersionInfo,
66};
67use reth_provider::{
68    providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
69    BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70    StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory,
71};
72use reth_prune::{PruneModes, PrunerBuilder};
73use reth_rpc_builder::config::RethRpcServerConfig;
74use reth_rpc_layer::JwtSecret;
75use reth_stages::{
76    sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
77    StageId,
78};
79use reth_static_file::StaticFileProducer;
80use reth_tasks::TaskExecutor;
81use reth_tracing::tracing::{debug, error, info, warn};
82use reth_transaction_pool::TransactionPool;
83use std::{sync::Arc, thread::available_parallelism};
84use tokio::sync::{
85    mpsc::{unbounded_channel, UnboundedSender},
86    oneshot, watch,
87};
88
89use futures::{future::Either, stream, Stream, StreamExt};
90use reth_node_ethstats::EthStatsService;
91use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
92
93/// Reusable setup for launching a node.
94///
95/// This is the entry point for the node launch process. It implements a builder
96/// pattern using type-state programming to enforce correct initialization order.
97///
98/// ## Type Evolution
99///
100/// Starting from `LaunchContext`, each method transforms the type to reflect
101/// accumulated state:
102///
103/// ```text
104/// LaunchContext
105///   └─> LaunchContextWith<WithConfigs>
106///       └─> LaunchContextWith<Attached<WithConfigs, DB>>
107///           └─> LaunchContextWith<Attached<WithConfigs, ProviderFactory>>
108///               └─> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders>>
109///                   └─> LaunchContextWith<Attached<WithConfigs, WithComponents>>
110/// ```
111#[derive(Debug, Clone)]
112pub struct LaunchContext {
113    /// The task executor for the node.
114    pub task_executor: TaskExecutor,
115    /// The data directory for the node.
116    pub data_dir: ChainPath<DataDirPath>,
117}
118
119impl LaunchContext {
120    /// Create a new instance of the default node launcher.
121    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
122        Self { task_executor, data_dir }
123    }
124
125    /// Create launch context with attachment.
126    pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
127        LaunchContextWith { inner: self, attachment }
128    }
129
130    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
131    /// `config`.
132    ///
133    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
134    pub fn with_loaded_toml_config<ChainSpec>(
135        self,
136        config: NodeConfig<ChainSpec>,
137    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
138    where
139        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
140    {
141        let toml_config = self.load_toml_config(&config)?;
142        Ok(self.with(WithConfigs { config, toml_config }))
143    }
144
145    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
146    /// `config`.
147    ///
148    /// This is async because the trusted peers may have to be resolved.
149    pub fn load_toml_config<ChainSpec>(
150        &self,
151        config: &NodeConfig<ChainSpec>,
152    ) -> eyre::Result<reth_config::Config>
153    where
154        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
155    {
156        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
157
158        let mut toml_config = reth_config::Config::from_path(&config_path)
159            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
160
161        Self::save_pruning_config(&mut toml_config, config, &config_path)?;
162
163        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
164
165        // Update the config with the command line arguments
166        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
167
168        // Merge static file CLI arguments with config file, giving priority to CLI
169        toml_config.static_files = config.static_files.merge_with_config(toml_config.static_files);
170
171        Ok(toml_config)
172    }
173
174    /// Save prune config to the toml file if node is a full node or has custom pruning CLI
175    /// arguments. Also migrates deprecated prune config values to new defaults.
176    fn save_pruning_config<ChainSpec>(
177        reth_config: &mut reth_config::Config,
178        config: &NodeConfig<ChainSpec>,
179        config_path: impl AsRef<std::path::Path>,
180    ) -> eyre::Result<()>
181    where
182        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
183    {
184        let mut should_save = reth_config.prune.segments.migrate();
185
186        if let Some(prune_config) = config.prune_config() {
187            if reth_config.prune != prune_config {
188                reth_config.set_prune_config(prune_config);
189                should_save = true;
190            }
191        } else if !reth_config.prune.is_default() {
192            warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
193        }
194
195        if should_save {
196            info!(target: "reth::cli", "Saving prune config to toml file");
197            reth_config.save(config_path.as_ref())?;
198        }
199
200        Ok(())
201    }
202
203    /// Convenience function to [`Self::configure_globals`]
204    pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
205        self.configure_globals(reserved_cpu_cores);
206        self
207    }
208
209    /// Configure global settings this includes:
210    ///
211    /// - Raising the file descriptor limit
212    /// - Configuring the global rayon thread pool with available parallelism. Honoring
213    ///   engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
214    ///   core for the rayon thread pool
215    pub fn configure_globals(&self, reserved_cpu_cores: usize) {
216        // Raise the fd limit of the process.
217        // Does not do anything on windows.
218        match fdlimit::raise_fd_limit() {
219            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
220                debug!(from, to, "Raised file descriptor limit");
221            }
222            Ok(fdlimit::Outcome::Unsupported) => {}
223            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
224        }
225
226        // Reserving the given number of CPU cores for the rest of OS.
227        // Users can reserve more cores by setting engine.reserved-cpu-cores
228        // Note: The global rayon thread pool will use at least one core.
229        let num_threads = available_parallelism()
230            .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
231        if let Err(err) = ThreadPoolBuilder::new()
232            .num_threads(num_threads)
233            .thread_name(|i| format!("reth-rayon-{i}"))
234            .build_global()
235        {
236            warn!(%err, "Failed to build global thread pool")
237        }
238    }
239}
240
241/// A [`LaunchContext`] along with an additional value.
242///
243/// The type parameter `T` represents the current state of the launch process.
244/// Methods are conditionally implemented based on `T`, ensuring operations
245/// are only available when their prerequisites are met.
246///
247/// For example:
248/// - Config methods when `T = WithConfigs<ChainSpec>`
249/// - Database operations when `T = Attached<WithConfigs<ChainSpec>, DB>`
250/// - Provider operations when `T = Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>`
251#[derive(Debug, Clone)]
252pub struct LaunchContextWith<T> {
253    /// The wrapped launch context.
254    pub inner: LaunchContext,
255    /// The additional attached value.
256    pub attachment: T,
257}
258
259impl<T> LaunchContextWith<T> {
260    /// Configure global settings this includes:
261    ///
262    /// - Raising the file descriptor limit
263    /// - Configuring the global rayon thread pool
264    pub fn configure_globals(&self, reserved_cpu_cores: u64) {
265        self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
266    }
267
268    /// Returns the data directory.
269    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
270        &self.inner.data_dir
271    }
272
273    /// Returns the task executor.
274    pub const fn task_executor(&self) -> &TaskExecutor {
275        &self.inner.task_executor
276    }
277
278    /// Attaches another value to the launch context.
279    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
280        LaunchContextWith {
281            inner: self.inner,
282            attachment: Attached::new(self.attachment, attachment),
283        }
284    }
285
286    /// Consumes the type and calls a function with a reference to the context.
287    // Returns the context again
288    pub fn inspect<F>(self, f: F) -> Self
289    where
290        F: FnOnce(&Self),
291    {
292        f(&self);
293        self
294    }
295}
296
297impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
298    /// Resolves the trusted peers and adds them to the toml config.
299    pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
300        if !self.attachment.config.network.trusted_peers.is_empty() {
301            info!(target: "reth::cli", "Adding trusted nodes");
302
303            self.attachment
304                .toml_config
305                .peers
306                .trusted_nodes
307                .extend(self.attachment.config.network.trusted_peers.clone());
308        }
309        Ok(self)
310    }
311}
312
313impl<L, R> LaunchContextWith<Attached<L, R>> {
314    /// Get a reference to the left value.
315    pub const fn left(&self) -> &L {
316        &self.attachment.left
317    }
318
319    /// Get a reference to the right value.
320    pub const fn right(&self) -> &R {
321        &self.attachment.right
322    }
323
324    /// Get a mutable reference to the left value.
325    pub const fn left_mut(&mut self) -> &mut L {
326        &mut self.attachment.left
327    }
328
329    /// Get a mutable reference to the right value.
330    pub const fn right_mut(&mut self) -> &mut R {
331        &mut self.attachment.right
332    }
333}
334impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
335    /// Adjust certain settings in the config to make sure they are set correctly
336    ///
337    /// This includes:
338    /// - Making sure the ETL dir is set to the datadir
339    /// - RPC settings are adjusted to the correct port
340    pub fn with_adjusted_configs(self) -> Self {
341        self.ensure_etl_datadir().with_adjusted_instance_ports()
342    }
343
344    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
345    pub fn ensure_etl_datadir(mut self) -> Self {
346        if self.toml_config_mut().stages.etl.dir.is_none() {
347            let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
348            if etl_path.exists() {
349                // Remove etl-path files on launch
350                if let Err(err) = fs::remove_dir_all(&etl_path) {
351                    warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
352                }
353            }
354            self.toml_config_mut().stages.etl.dir = Some(etl_path);
355        }
356
357        self
358    }
359
360    /// Change rpc port numbers based on the instance number.
361    pub fn with_adjusted_instance_ports(mut self) -> Self {
362        self.node_config_mut().adjust_instance_ports();
363        self
364    }
365
366    /// Returns the container for all config types
367    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
368        self.attachment.left()
369    }
370
371    /// Returns the attached [`NodeConfig`].
372    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
373        &self.left().config
374    }
375
376    /// Returns the attached [`NodeConfig`].
377    pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
378        &mut self.left_mut().config
379    }
380
381    /// Returns the attached toml config [`reth_config::Config`].
382    pub const fn toml_config(&self) -> &reth_config::Config {
383        &self.left().toml_config
384    }
385
386    /// Returns the attached toml config [`reth_config::Config`].
387    pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
388        &mut self.left_mut().toml_config
389    }
390
391    /// Returns the configured chain spec.
392    pub fn chain_spec(&self) -> Arc<ChainSpec> {
393        self.node_config().chain.clone()
394    }
395
396    /// Get the hash of the genesis block.
397    pub fn genesis_hash(&self) -> B256 {
398        self.node_config().chain.genesis_hash()
399    }
400
401    /// Returns the chain identifier of the node.
402    pub fn chain_id(&self) -> Chain {
403        self.node_config().chain.chain()
404    }
405
406    /// Returns true if the node is configured as --dev
407    pub const fn is_dev(&self) -> bool {
408        self.node_config().dev.dev
409    }
410
411    /// Returns the configured [`PruneConfig`]
412    ///
413    /// Any configuration set in CLI will take precedence over those set in toml
414    pub fn prune_config(&self) -> PruneConfig
415    where
416        ChainSpec: reth_chainspec::EthereumHardforks,
417    {
418        let Some(mut node_prune_config) = self.node_config().prune_config() else {
419            // No CLI config is set, use the toml config.
420            return self.toml_config().prune.clone();
421        };
422
423        // Otherwise, use the CLI configuration and merge with toml config.
424        node_prune_config.merge(self.toml_config().prune.clone());
425        node_prune_config
426    }
427
428    /// Returns the configured [`PruneModes`], returning the default if no config was available.
429    pub fn prune_modes(&self) -> PruneModes
430    where
431        ChainSpec: reth_chainspec::EthereumHardforks,
432    {
433        self.prune_config().segments
434    }
435
436    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
437    pub fn pruner_builder(&self) -> PrunerBuilder
438    where
439        ChainSpec: reth_chainspec::EthereumHardforks,
440    {
441        PrunerBuilder::new(self.prune_config())
442    }
443
444    /// Loads the JWT secret for the engine API
445    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
446        let default_jwt_path = self.data_dir().jwt();
447        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
448        Ok(secret)
449    }
450
451    /// Returns the [`MiningMode`] intended for --dev mode.
452    pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
453    where
454        Pool: TransactionPool + Unpin,
455    {
456        if let Some(interval) = self.node_config().dev.block_time {
457            MiningMode::interval(interval)
458        } else {
459            MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
460        }
461    }
462}
463
464impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
465where
466    DB: Database + Clone + 'static,
467    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
468{
469    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
470    /// between the database and static files. **It may execute a pipeline unwind if it fails this
471    /// check.**
472    pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
473    where
474        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
475        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
476    {
477        // Validate static files configuration
478        let static_files_config = &self.toml_config().static_files;
479        static_files_config.validate()?;
480
481        // Apply per-segment blocks_per_file configuration
482        let static_file_provider =
483            StaticFileProviderBuilder::read_write(self.data_dir().static_files())?
484                .with_metrics()
485                .with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
486                .build()?;
487
488        let factory =
489            ProviderFactory::new(self.right().clone(), self.chain_spec(), static_file_provider)?
490                .with_prune_modes(self.prune_modes());
491
492        // Check for consistency between database and static files. If it fails, it unwinds to
493        // the first block that's consistent between database and static files.
494        if let Some(unwind_target) =
495            factory.static_file_provider().check_consistency(&factory.provider()?)?
496        {
497            // Highly unlikely to happen, and given its destructive nature, it's better to panic
498            // instead.
499            assert_ne!(
500                unwind_target,
501                PipelineTarget::Unwind(0),
502                "A static file <> database inconsistency was found that would trigger an unwind to block 0"
503            );
504
505            info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
506
507            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
508
509            // Builds an unwind-only pipeline
510            let pipeline = PipelineBuilder::default()
511                .add_stages(DefaultStages::new(
512                    factory.clone(),
513                    tip_rx,
514                    Arc::new(NoopConsensus::default()),
515                    NoopHeaderDownloader::default(),
516                    NoopBodiesDownloader::default(),
517                    NoopEvmConfig::<Evm>::default(),
518                    self.toml_config().stages.clone(),
519                    self.prune_modes(),
520                    None,
521                ))
522                .build(
523                    factory.clone(),
524                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
525                );
526
527            // Unwinds to block
528            let (tx, rx) = oneshot::channel();
529
530            // Pipeline should be run as blocking and panic if it fails.
531            self.task_executor().spawn_critical_blocking(
532                "pipeline task",
533                Box::pin(async move {
534                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
535                    let _ = tx.send(result);
536                }),
537            );
538            rx.await?.inspect_err(|err| {
539                error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
540            })?;
541        }
542
543        Ok(factory)
544    }
545
546    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
547    pub async fn with_provider_factory<N, Evm>(
548        self,
549    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
550    where
551        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
552        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
553    {
554        let factory = self.create_provider_factory::<N, Evm>().await?;
555        let ctx = LaunchContextWith {
556            inner: self.inner,
557            attachment: self.attachment.map_right(|_| factory),
558        };
559
560        Ok(ctx)
561    }
562}
563
564impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
565where
566    T: ProviderNodeTypes,
567{
568    /// Returns access to the underlying database.
569    pub const fn database(&self) -> &T::DB {
570        self.right().db_ref()
571    }
572
573    /// Returns the configured `ProviderFactory`.
574    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
575        self.right()
576    }
577
578    /// Returns the static file provider to interact with the static files.
579    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
580        self.right().static_file_provider()
581    }
582
583    /// This launches the prometheus endpoint.
584    ///
585    /// Convenience function to [`Self::start_prometheus_endpoint`]
586    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
587        self.start_prometheus_endpoint().await?;
588        Ok(self)
589    }
590
591    /// Starts the prometheus endpoint.
592    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
593        // ensure recorder runs upkeep periodically
594        install_prometheus_recorder().spawn_upkeep();
595
596        let listen_addr = self.node_config().metrics.prometheus;
597        if let Some(addr) = listen_addr {
598            let config = MetricServerConfig::new(
599                addr,
600                VersionInfo {
601                    version: version_metadata().cargo_pkg_version.as_ref(),
602                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
603                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
604                    git_sha: version_metadata().vergen_git_sha.as_ref(),
605                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
606                    build_profile: version_metadata().build_profile_name.as_ref(),
607                },
608                ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
609                self.task_executor().clone(),
610                Hooks::builder()
611                    .with_hook({
612                        let db = self.database().clone();
613                        move || db.report_metrics()
614                    })
615                    .with_hook({
616                        let sfp = self.static_file_provider();
617                        move || {
618                            if let Err(error) = sfp.report_metrics() {
619                                error!(%error, "Failed to report metrics for the static file provider");
620                            }
621                        }
622                    })
623                    .build(),
624            ).with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
625
626            MetricServer::new(config).serve().await?;
627        }
628
629        Ok(())
630    }
631
632    /// Convenience function to [`Self::init_genesis`]
633    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
634        init_genesis_with_settings(
635            self.provider_factory(),
636            self.node_config().static_files.to_settings(),
637        )?;
638        Ok(self)
639    }
640
641    /// Write the genesis block and state if it has not already been written
642    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
643        init_genesis_with_settings(
644            self.provider_factory(),
645            self.node_config().static_files.to_settings(),
646        )
647    }
648
649    /// Creates a new `WithMeteredProvider` container and attaches it to the
650    /// launch context.
651    ///
652    /// This spawns a metrics task that listens for metrics related events and updates metrics for
653    /// prometheus.
654    pub fn with_metrics_task(
655        self,
656    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
657        let (metrics_sender, metrics_receiver) = unbounded_channel();
658
659        let with_metrics =
660            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
661
662        debug!(target: "reth::cli", "Spawning stages metrics listener task");
663        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
664        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
665
666        LaunchContextWith {
667            inner: self.inner,
668            attachment: self.attachment.map_right(|_| with_metrics),
669        }
670    }
671}
672
673impl<N, DB>
674    LaunchContextWith<
675        Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
676    >
677where
678    N: NodeTypes,
679    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
680{
681    /// Returns the configured `ProviderFactory`.
682    const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
683        &self.right().provider_factory
684    }
685
686    /// Returns the metrics sender.
687    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
688        self.right().metrics_sender.clone()
689    }
690
691    /// Creates a `BlockchainProvider` and attaches it to the launch context.
692    #[expect(clippy::complexity)]
693    pub fn with_blockchain_db<T, F>(
694        self,
695        create_blockchain_provider: F,
696    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
697    where
698        T: FullNodeTypes<Types = N, DB = DB>,
699        F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
700    {
701        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
702
703        let metered_providers = WithMeteredProviders {
704            db_provider_container: WithMeteredProvider {
705                provider_factory: self.provider_factory().clone(),
706                metrics_sender: self.sync_metrics_tx(),
707            },
708            blockchain_db,
709        };
710
711        let ctx = LaunchContextWith {
712            inner: self.inner,
713            attachment: self.attachment.map_right(|_| metered_providers),
714        };
715
716        Ok(ctx)
717    }
718}
719
720impl<T>
721    LaunchContextWith<
722        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
723    >
724where
725    T: FullNodeTypes<Types: NodeTypesForProvider>,
726{
727    /// Returns access to the underlying database.
728    pub const fn database(&self) -> &T::DB {
729        self.provider_factory().db_ref()
730    }
731
732    /// Returns the configured `ProviderFactory`.
733    pub const fn provider_factory(
734        &self,
735    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
736        &self.right().db_provider_container.provider_factory
737    }
738
739    /// Fetches the head block from the database.
740    ///
741    /// If the database is empty, returns the genesis block.
742    pub fn lookup_head(&self) -> eyre::Result<Head> {
743        self.node_config()
744            .lookup_head(self.provider_factory())
745            .wrap_err("the head block is missing")
746    }
747
748    /// Returns the metrics sender.
749    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
750        self.right().db_provider_container.metrics_sender.clone()
751    }
752
753    /// Returns a reference to the blockchain provider.
754    pub const fn blockchain_db(&self) -> &T::Provider {
755        &self.right().blockchain_db
756    }
757
758    /// Creates a `NodeAdapter` and attaches it to the launch context.
759    pub async fn with_components<CB>(
760        self,
761        components_builder: CB,
762        on_component_initialized: Box<
763            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
764        >,
765    ) -> eyre::Result<
766        LaunchContextWith<
767            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
768        >,
769    >
770    where
771        CB: NodeComponentsBuilder<T>,
772    {
773        // fetch the head block from the database
774        let head = self.lookup_head()?;
775
776        let builder_ctx = BuilderContext::new(
777            head,
778            self.blockchain_db().clone(),
779            self.task_executor().clone(),
780            self.configs().clone(),
781        );
782
783        debug!(target: "reth::cli", "creating components");
784        let components = components_builder.build_components(&builder_ctx).await?;
785
786        let blockchain_db = self.blockchain_db().clone();
787
788        let node_adapter = NodeAdapter {
789            components,
790            task_executor: self.task_executor().clone(),
791            provider: blockchain_db,
792        };
793
794        debug!(target: "reth::cli", "calling on_component_initialized hook");
795        on_component_initialized.on_event(node_adapter.clone())?;
796
797        let components_container = WithComponents {
798            db_provider_container: WithMeteredProvider {
799                provider_factory: self.provider_factory().clone(),
800                metrics_sender: self.sync_metrics_tx(),
801            },
802            node_adapter,
803            head,
804        };
805
806        let ctx = LaunchContextWith {
807            inner: self.inner,
808            attachment: self.attachment.map_right(|_| components_container),
809        };
810
811        Ok(ctx)
812    }
813}
814
815impl<T, CB>
816    LaunchContextWith<
817        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
818    >
819where
820    T: FullNodeTypes<Types: NodeTypesForProvider>,
821    CB: NodeComponentsBuilder<T>,
822{
823    /// Returns the configured `ProviderFactory`.
824    pub const fn provider_factory(
825        &self,
826    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
827        &self.right().db_provider_container.provider_factory
828    }
829
830    /// Returns the max block that the node should run to, looking it up from the network if
831    /// necessary
832    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
833    where
834        C: HeadersClient<Header: BlockHeader>,
835    {
836        self.node_config().max_block(client, self.provider_factory().clone()).await
837    }
838
839    /// Returns the static file provider to interact with the static files.
840    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
841        self.provider_factory().static_file_provider()
842    }
843
844    /// Creates a new [`StaticFileProducer`] with the attached database.
845    pub fn static_file_producer(
846        &self,
847    ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
848        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
849    }
850
851    /// Returns the current head block.
852    pub const fn head(&self) -> Head {
853        self.right().head
854    }
855
856    /// Returns the configured `NodeAdapter`.
857    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
858        &self.right().node_adapter
859    }
860
861    /// Returns mutable reference to the configured `NodeAdapter`.
862    pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
863        &mut self.right_mut().node_adapter
864    }
865
866    /// Returns a reference to the blockchain provider.
867    pub const fn blockchain_db(&self) -> &T::Provider {
868        &self.node_adapter().provider
869    }
870
871    /// Returns the initial backfill to sync to at launch.
872    ///
873    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
874    /// previously interrupted and returns the block hash of the last checkpoint, see also
875    /// [`Self::check_pipeline_consistency`]
876    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
877        let mut initial_target = self.node_config().debug.tip;
878
879        if initial_target.is_none() {
880            initial_target = self.check_pipeline_consistency()?;
881        }
882
883        Ok(initial_target)
884    }
885
886    /// Returns true if the node should terminate after the initial backfill run.
887    ///
888    /// This is the case if any of these configs are set:
889    ///  `--debug.max-block`
890    ///  `--debug.terminate`
891    pub const fn terminate_after_initial_backfill(&self) -> bool {
892        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
893    }
894
895    /// Ensures that the database matches chain-specific requirements.
896    ///
897    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
898    /// bedrock height)
899    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
900        if self.chain_spec().is_optimism() &&
901            !self.is_dev() &&
902            self.chain_id() == Chain::optimism_mainnet()
903        {
904            let latest = self.blockchain_db().last_block_number()?;
905            // bedrock height
906            if latest < 105235063 {
907                error!(
908                    "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
909                );
910                return Err(ProviderError::BestBlockNotFound)
911            }
912        }
913
914        Ok(())
915    }
916
917    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
918    /// than the checkpoint of the first stage).
919    ///
920    /// This will return the pipeline target if:
921    ///  * the pipeline was interrupted during its previous run
922    ///  * a new stage was added
923    ///  * stage data was dropped manually through `reth stage drop ...`
924    ///
925    /// # Returns
926    ///
927    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
928    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
929        // If no target was provided, check if the stages are congruent - check if the
930        // checkpoint of the last stage matches the checkpoint of the first.
931        let first_stage_checkpoint = self
932            .blockchain_db()
933            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
934            .unwrap_or_default()
935            .block_number;
936
937        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
938        // against it.
939        for stage_id in StageId::ALL.iter().skip(1) {
940            let stage_checkpoint = self
941                .blockchain_db()
942                .get_stage_checkpoint(*stage_id)?
943                .unwrap_or_default()
944                .block_number;
945
946            // If the checkpoint of any stage is less than the checkpoint of the first stage,
947            // retrieve and return the block hash of the latest header and use it as the target.
948            if stage_checkpoint < first_stage_checkpoint {
949                debug!(
950                    target: "consensus::engine",
951                    first_stage_checkpoint,
952                    inconsistent_stage_id = %stage_id,
953                    inconsistent_stage_checkpoint = stage_checkpoint,
954                    "Pipeline sync progress is inconsistent"
955                );
956                return self.blockchain_db().block_hash(first_stage_checkpoint);
957            }
958        }
959
960        self.ensure_chain_specific_db_checks()?;
961
962        Ok(None)
963    }
964
965    /// Returns the metrics sender.
966    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
967        self.right().db_provider_container.metrics_sender.clone()
968    }
969
970    /// Returns the node adapter components.
971    pub const fn components(&self) -> &CB::Components {
972        &self.node_adapter().components
973    }
974
975    /// Launches ExEx (Execution Extensions) and returns the ExEx manager handle.
976    #[allow(clippy::type_complexity)]
977    pub async fn launch_exex(
978        &self,
979        installed_exex: Vec<(
980            String,
981            Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
982        )>,
983    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
984        ExExLauncher::new(
985            self.head(),
986            self.node_adapter().clone(),
987            installed_exex,
988            self.configs().clone(),
989        )
990        .launch()
991        .await
992    }
993
994    /// Creates the ERA import source based on node configuration.
995    ///
996    /// Returns `Some(EraImportSource)` if ERA is enabled in the node config, otherwise `None`.
997    pub fn era_import_source(&self) -> Option<EraImportSource> {
998        let node_config = self.node_config();
999        if !node_config.era.enabled {
1000            return None;
1001        }
1002
1003        EraImportSource::maybe_new(
1004            node_config.era.source.path.clone(),
1005            node_config.era.source.url.clone(),
1006            || node_config.chain.chain().kind().default_era_host(),
1007            || node_config.datadir().data_dir().join("era").into(),
1008        )
1009    }
1010
1011    /// Creates consensus layer health events stream based on node configuration.
1012    ///
1013    /// Returns a stream that monitors consensus layer health if:
1014    /// - No debug tip is configured
1015    /// - Not running in dev mode
1016    ///
1017    /// Otherwise returns an empty stream.
1018    pub fn consensus_layer_events(
1019        &self,
1020    ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1021    where
1022        T::Provider: reth_provider::CanonChainTracker,
1023    {
1024        if self.node_config().debug.tip.is_none() && !self.is_dev() {
1025            Either::Left(
1026                ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1027                    .map(Into::into),
1028            )
1029        } else {
1030            Either::Right(stream::empty())
1031        }
1032    }
1033
1034    /// Spawns the [`EthStatsService`] service if configured.
1035    pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1036        let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1037
1038        let network = self.components().network().clone();
1039        let pool = self.components().pool().clone();
1040        let provider = self.node_adapter().provider.clone();
1041
1042        info!(target: "reth::cli", "Starting EthStats service at {}", url);
1043
1044        let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1045        tokio::spawn(async move { ethstats.run().await });
1046
1047        Ok(())
1048    }
1049}
1050
1051/// Joins two attachments together, preserving access to both values.
1052///
1053/// This type enables the launch process to accumulate state while maintaining
1054/// access to all previously attached components. The `left` field holds the
1055/// previous state, while `right` holds the newly attached component.
1056#[derive(Clone, Copy, Debug)]
1057pub struct Attached<L, R> {
1058    left: L,
1059    right: R,
1060}
1061
1062impl<L, R> Attached<L, R> {
1063    /// Creates a new `Attached` with the given values.
1064    pub const fn new(left: L, right: R) -> Self {
1065        Self { left, right }
1066    }
1067
1068    /// Maps the left value to a new value.
1069    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1070    where
1071        F: FnOnce(L) -> T,
1072    {
1073        Attached::new(f(self.left), self.right)
1074    }
1075
1076    /// Maps the right value to a new value.
1077    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1078    where
1079        F: FnOnce(R) -> T,
1080    {
1081        Attached::new(self.left, f(self.right))
1082    }
1083
1084    /// Get a reference to the left value.
1085    pub const fn left(&self) -> &L {
1086        &self.left
1087    }
1088
1089    /// Get a reference to the right value.
1090    pub const fn right(&self) -> &R {
1091        &self.right
1092    }
1093
1094    /// Get a mutable reference to the left value.
1095    pub const fn left_mut(&mut self) -> &mut L {
1096        &mut self.left
1097    }
1098
1099    /// Get a mutable reference to the right value.
1100    pub const fn right_mut(&mut self) -> &mut R {
1101        &mut self.right
1102    }
1103}
1104
1105/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
1106/// reth.toml config
1107#[derive(Debug)]
1108pub struct WithConfigs<ChainSpec> {
1109    /// The configured, usually derived from the CLI.
1110    pub config: NodeConfig<ChainSpec>,
1111    /// The loaded reth.toml config.
1112    pub toml_config: reth_config::Config,
1113}
1114
1115impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1116    fn clone(&self) -> Self {
1117        Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1118    }
1119}
1120
1121/// Helper container type to bundle the [`ProviderFactory`] and the metrics
1122/// sender.
1123#[derive(Debug, Clone)]
1124pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1125    provider_factory: ProviderFactory<N>,
1126    metrics_sender: UnboundedSender<MetricEvent>,
1127}
1128
1129/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1130/// and a metrics sender.
1131#[expect(missing_debug_implementations)]
1132pub struct WithMeteredProviders<T>
1133where
1134    T: FullNodeTypes,
1135{
1136    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1137    blockchain_db: T::Provider,
1138}
1139
1140/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1141#[expect(missing_debug_implementations)]
1142pub struct WithComponents<T, CB>
1143where
1144    T: FullNodeTypes,
1145    CB: NodeComponentsBuilder<T>,
1146{
1147    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1148    node_adapter: NodeAdapter<T, CB::Components>,
1149    head: Head,
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::{LaunchContext, NodeConfig};
1155    use reth_config::Config;
1156    use reth_node_core::args::PruningArgs;
1157
1158    const EXTENSION: &str = "toml";
1159
1160    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1161        let temp_dir = tempfile::tempdir().unwrap();
1162        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1163        proc(&config_path);
1164        temp_dir.close().unwrap()
1165    }
1166
1167    #[test]
1168    fn test_save_prune_config() {
1169        with_tempdir("prune-store-test", |config_path| {
1170            let mut reth_config = Config::default();
1171            let node_config = NodeConfig {
1172                pruning: PruningArgs {
1173                    full: true,
1174                    block_interval: None,
1175                    sender_recovery_full: false,
1176                    sender_recovery_distance: None,
1177                    sender_recovery_before: None,
1178                    transaction_lookup_full: false,
1179                    transaction_lookup_distance: None,
1180                    transaction_lookup_before: None,
1181                    receipts_full: false,
1182                    receipts_pre_merge: false,
1183                    receipts_distance: None,
1184                    receipts_before: None,
1185                    account_history_full: false,
1186                    account_history_distance: None,
1187                    account_history_before: None,
1188                    storage_history_full: false,
1189                    storage_history_distance: None,
1190                    storage_history_before: None,
1191                    bodies_pre_merge: false,
1192                    bodies_distance: None,
1193                    receipts_log_filter: None,
1194                    bodies_before: None,
1195                },
1196                ..NodeConfig::test()
1197            };
1198            LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1199                .unwrap();
1200
1201            let loaded_config = Config::from_path(config_path).unwrap();
1202
1203            assert_eq!(reth_config, loaded_config);
1204        })
1205    }
1206}