reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2//!
3//! ## Launch Context Type System
4//!
5//! The node launch process uses a type-state pattern to ensure correct initialization
6//! order at compile time. Methods are only available when their prerequisites are met.
7//!
8//! ### Core Types
9//!
10//! - [`LaunchContext`]: Base context with executor and data directory
11//! - [`LaunchContextWith<T>`]: Context with an attached value of type `T`
12//! - [`Attached<L, R>`]: Pairs values, preserving both previous (L) and new (R) state
13//!
14//! ### Helper Attachments
15//!
16//! - [`WithConfigs`]: Node config + TOML config
17//! - [`WithMeteredProvider`]: Provider factory with metrics
18//! - [`WithMeteredProviders`]: Provider factory + blockchain provider
19//! - [`WithComponents`]: Final form with all components
20//!
21//! ### Method Availability
22//!
23//! Methods are implemented on specific type combinations:
24//! - `impl<T> LaunchContextWith<T>`: Generic methods available for any attachment
25//! - `impl LaunchContextWith<WithConfigs>`: Config-specific methods
26//! - `impl LaunchContextWith<Attached<WithConfigs, DB>>`: Database operations
27//! - `impl LaunchContextWith<Attached<WithConfigs, ProviderFactory>>`: Provider operations
28//! - etc.
29//!
30//! This ensures correct initialization order without runtime checks.
31
32use crate::{
33    components::{NodeComponents, NodeComponentsBuilder},
34    hooks::OnComponentInitializedHook,
35    BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54    args::DefaultEraHost,
55    dirs::{ChainPath, DataDirPath},
56    node_config::NodeConfig,
57    primitives::BlockHeader,
58    version::version_metadata,
59};
60use reth_node_metrics::{
61    chain::ChainSpecInfo,
62    hooks::Hooks,
63    recorder::install_prometheus_recorder,
64    server::{MetricServer, MetricServerConfig},
65    version::VersionInfo,
66};
67use reth_provider::{
68    providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
69    BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
70    ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
71    StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77    sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78    StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::tracing::{debug, error, info, warn};
83use reth_transaction_pool::TransactionPool;
84use std::{sync::Arc, thread::available_parallelism};
85use tokio::sync::{
86    mpsc::{unbounded_channel, UnboundedSender},
87    oneshot, watch,
88};
89
90use futures::{future::Either, stream, Stream, StreamExt};
91use reth_node_ethstats::EthStatsService;
92use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
93
94/// Reusable setup for launching a node.
95///
96/// This is the entry point for the node launch process. It implements a builder
97/// pattern using type-state programming to enforce correct initialization order.
98///
99/// ## Type Evolution
100///
101/// Starting from `LaunchContext`, each method transforms the type to reflect
102/// accumulated state:
103///
104/// ```text
105/// LaunchContext
106///   └─> LaunchContextWith<WithConfigs>
107///       └─> LaunchContextWith<Attached<WithConfigs, DB>>
108///           └─> LaunchContextWith<Attached<WithConfigs, ProviderFactory>>
109///               └─> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders>>
110///                   └─> LaunchContextWith<Attached<WithConfigs, WithComponents>>
111/// ```
112#[derive(Debug, Clone)]
113pub struct LaunchContext {
114    /// The task executor for the node.
115    pub task_executor: TaskExecutor,
116    /// The data directory for the node.
117    pub data_dir: ChainPath<DataDirPath>,
118}
119
120impl LaunchContext {
121    /// Create a new instance of the default node launcher.
122    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
123        Self { task_executor, data_dir }
124    }
125
126    /// Create launch context with attachment.
127    pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
128        LaunchContextWith { inner: self, attachment }
129    }
130
131    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
132    /// `config`.
133    ///
134    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
135    pub fn with_loaded_toml_config<ChainSpec>(
136        self,
137        config: NodeConfig<ChainSpec>,
138    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
139    where
140        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
141    {
142        let toml_config = self.load_toml_config(&config)?;
143        Ok(self.with(WithConfigs { config, toml_config }))
144    }
145
146    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
147    /// `config`.
148    ///
149    /// This is async because the trusted peers may have to be resolved.
150    pub fn load_toml_config<ChainSpec>(
151        &self,
152        config: &NodeConfig<ChainSpec>,
153    ) -> eyre::Result<reth_config::Config>
154    where
155        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
156    {
157        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
158
159        let mut toml_config = reth_config::Config::from_path(&config_path)
160            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
161
162        Self::save_pruning_config(&mut toml_config, config, &config_path)?;
163
164        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
165
166        // Update the config with the command line arguments
167        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
168
169        // Merge static file CLI arguments with config file, giving priority to CLI
170        toml_config.static_files = config.static_files.merge_with_config(toml_config.static_files);
171
172        Ok(toml_config)
173    }
174
175    /// Save prune config to the toml file if node is a full node or has custom pruning CLI
176    /// arguments. Also migrates deprecated prune config values to new defaults.
177    fn save_pruning_config<ChainSpec>(
178        reth_config: &mut reth_config::Config,
179        config: &NodeConfig<ChainSpec>,
180        config_path: impl AsRef<std::path::Path>,
181    ) -> eyre::Result<()>
182    where
183        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
184    {
185        let mut should_save = reth_config.prune.segments.migrate();
186
187        if let Some(prune_config) = config.prune_config() {
188            if reth_config.prune != prune_config {
189                reth_config.set_prune_config(prune_config);
190                should_save = true;
191            }
192        } else if !reth_config.prune.is_default() {
193            warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
194        }
195
196        if should_save {
197            info!(target: "reth::cli", "Saving prune config to toml file");
198            reth_config.save(config_path.as_ref())?;
199        }
200
201        Ok(())
202    }
203
204    /// Convenience function to [`Self::configure_globals`]
205    pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
206        self.configure_globals(reserved_cpu_cores);
207        self
208    }
209
210    /// Configure global settings this includes:
211    ///
212    /// - Raising the file descriptor limit
213    /// - Configuring the global rayon thread pool with available parallelism. Honoring
214    ///   engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
215    ///   core for the rayon thread pool
216    pub fn configure_globals(&self, reserved_cpu_cores: usize) {
217        // Raise the fd limit of the process.
218        // Does not do anything on windows.
219        match fdlimit::raise_fd_limit() {
220            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
221                debug!(from, to, "Raised file descriptor limit");
222            }
223            Ok(fdlimit::Outcome::Unsupported) => {}
224            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
225        }
226
227        // Reserving the given number of CPU cores for the rest of OS.
228        // Users can reserve more cores by setting engine.reserved-cpu-cores
229        // Note: The global rayon thread pool will use at least one core.
230        let num_threads = available_parallelism()
231            .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
232        if let Err(err) = ThreadPoolBuilder::new()
233            .num_threads(num_threads)
234            .thread_name(|i| format!("reth-rayon-{i}"))
235            .build_global()
236        {
237            warn!(%err, "Failed to build global thread pool")
238        }
239    }
240}
241
242/// A [`LaunchContext`] along with an additional value.
243///
244/// The type parameter `T` represents the current state of the launch process.
245/// Methods are conditionally implemented based on `T`, ensuring operations
246/// are only available when their prerequisites are met.
247///
248/// For example:
249/// - Config methods when `T = WithConfigs<ChainSpec>`
250/// - Database operations when `T = Attached<WithConfigs<ChainSpec>, DB>`
251/// - Provider operations when `T = Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>`
252#[derive(Debug, Clone)]
253pub struct LaunchContextWith<T> {
254    /// The wrapped launch context.
255    pub inner: LaunchContext,
256    /// The additional attached value.
257    pub attachment: T,
258}
259
260impl<T> LaunchContextWith<T> {
261    /// Configure global settings this includes:
262    ///
263    /// - Raising the file descriptor limit
264    /// - Configuring the global rayon thread pool
265    pub fn configure_globals(&self, reserved_cpu_cores: u64) {
266        self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
267    }
268
269    /// Returns the data directory.
270    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
271        &self.inner.data_dir
272    }
273
274    /// Returns the task executor.
275    pub const fn task_executor(&self) -> &TaskExecutor {
276        &self.inner.task_executor
277    }
278
279    /// Attaches another value to the launch context.
280    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
281        LaunchContextWith {
282            inner: self.inner,
283            attachment: Attached::new(self.attachment, attachment),
284        }
285    }
286
287    /// Consumes the type and calls a function with a reference to the context.
288    // Returns the context again
289    pub fn inspect<F>(self, f: F) -> Self
290    where
291        F: FnOnce(&Self),
292    {
293        f(&self);
294        self
295    }
296}
297
298impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
299    /// Resolves the trusted peers and adds them to the toml config.
300    pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
301        if !self.attachment.config.network.trusted_peers.is_empty() {
302            info!(target: "reth::cli", "Adding trusted nodes");
303
304            self.attachment
305                .toml_config
306                .peers
307                .trusted_nodes
308                .extend(self.attachment.config.network.trusted_peers.clone());
309        }
310        Ok(self)
311    }
312}
313
314impl<L, R> LaunchContextWith<Attached<L, R>> {
315    /// Get a reference to the left value.
316    pub const fn left(&self) -> &L {
317        &self.attachment.left
318    }
319
320    /// Get a reference to the right value.
321    pub const fn right(&self) -> &R {
322        &self.attachment.right
323    }
324
325    /// Get a mutable reference to the left value.
326    pub const fn left_mut(&mut self) -> &mut L {
327        &mut self.attachment.left
328    }
329
330    /// Get a mutable reference to the right value.
331    pub const fn right_mut(&mut self) -> &mut R {
332        &mut self.attachment.right
333    }
334}
335impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
336    /// Adjust certain settings in the config to make sure they are set correctly
337    ///
338    /// This includes:
339    /// - Making sure the ETL dir is set to the datadir
340    /// - RPC settings are adjusted to the correct port
341    pub fn with_adjusted_configs(self) -> Self {
342        self.ensure_etl_datadir().with_adjusted_instance_ports()
343    }
344
345    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
346    pub fn ensure_etl_datadir(mut self) -> Self {
347        if self.toml_config_mut().stages.etl.dir.is_none() {
348            let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
349            if etl_path.exists() {
350                // Remove etl-path files on launch
351                if let Err(err) = fs::remove_dir_all(&etl_path) {
352                    warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
353                }
354            }
355            self.toml_config_mut().stages.etl.dir = Some(etl_path);
356        }
357
358        self
359    }
360
361    /// Change rpc port numbers based on the instance number.
362    pub fn with_adjusted_instance_ports(mut self) -> Self {
363        self.node_config_mut().adjust_instance_ports();
364        self
365    }
366
367    /// Returns the container for all config types
368    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
369        self.attachment.left()
370    }
371
372    /// Returns the attached [`NodeConfig`].
373    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
374        &self.left().config
375    }
376
377    /// Returns the attached [`NodeConfig`].
378    pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
379        &mut self.left_mut().config
380    }
381
382    /// Returns the attached toml config [`reth_config::Config`].
383    pub const fn toml_config(&self) -> &reth_config::Config {
384        &self.left().toml_config
385    }
386
387    /// Returns the attached toml config [`reth_config::Config`].
388    pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
389        &mut self.left_mut().toml_config
390    }
391
392    /// Returns the configured chain spec.
393    pub fn chain_spec(&self) -> Arc<ChainSpec> {
394        self.node_config().chain.clone()
395    }
396
397    /// Get the hash of the genesis block.
398    pub fn genesis_hash(&self) -> B256 {
399        self.node_config().chain.genesis_hash()
400    }
401
402    /// Returns the chain identifier of the node.
403    pub fn chain_id(&self) -> Chain {
404        self.node_config().chain.chain()
405    }
406
407    /// Returns true if the node is configured as --dev
408    pub const fn is_dev(&self) -> bool {
409        self.node_config().dev.dev
410    }
411
412    /// Returns the configured [`PruneConfig`]
413    ///
414    /// Any configuration set in CLI will take precedence over those set in toml
415    pub fn prune_config(&self) -> PruneConfig
416    where
417        ChainSpec: reth_chainspec::EthereumHardforks,
418    {
419        let Some(mut node_prune_config) = self.node_config().prune_config() else {
420            // No CLI config is set, use the toml config.
421            return self.toml_config().prune.clone();
422        };
423
424        // Otherwise, use the CLI configuration and merge with toml config.
425        node_prune_config.merge(self.toml_config().prune.clone());
426        node_prune_config
427    }
428
429    /// Returns the configured [`PruneModes`], returning the default if no config was available.
430    pub fn prune_modes(&self) -> PruneModes
431    where
432        ChainSpec: reth_chainspec::EthereumHardforks,
433    {
434        self.prune_config().segments
435    }
436
437    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
438    pub fn pruner_builder(&self) -> PrunerBuilder
439    where
440        ChainSpec: reth_chainspec::EthereumHardforks,
441    {
442        PrunerBuilder::new(self.prune_config())
443    }
444
445    /// Loads the JWT secret for the engine API
446    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
447        let default_jwt_path = self.data_dir().jwt();
448        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
449        Ok(secret)
450    }
451
452    /// Returns the [`MiningMode`] intended for --dev mode.
453    pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
454    where
455        Pool: TransactionPool + Unpin,
456    {
457        self.node_config().dev_mining_mode(pool)
458    }
459}
460
461impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
462where
463    DB: Database + Clone + 'static,
464    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
465{
466    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
467    /// between the database and static files. **It may execute a pipeline unwind if it fails this
468    /// check.**
469    pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
470    where
471        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
472        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
473    {
474        // Validate static files configuration
475        let static_files_config = &self.toml_config().static_files;
476        static_files_config.validate()?;
477
478        // Apply per-segment blocks_per_file configuration
479        let static_file_provider =
480            StaticFileProviderBuilder::read_write(self.data_dir().static_files())?
481                .with_metrics()
482                .with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
483                .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
484                .build()?;
485
486        // Initialize RocksDB provider with metrics, statistics, and default tables
487        let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
488            .with_default_tables()
489            .with_metrics()
490            .with_statistics()
491            .build()?;
492
493        let factory = ProviderFactory::new(
494            self.right().clone(),
495            self.chain_spec(),
496            static_file_provider,
497            rocksdb_provider,
498        )?
499        .with_prune_modes(self.prune_modes());
500
501        // Keep MDBX, static files, and RocksDB aligned. If any check fails, unwind to the
502        // earliest consistent block.
503        //
504        // Order matters:
505        // 1) heal static files (no pruning)
506        // 2) check RocksDB (needs static-file tx data)
507        // 3) check static-file checkpoints vs MDBX (may prune)
508        //
509        // Compute one unwind target and run a single unwind.
510
511        let provider_ro = factory.database_provider_ro()?;
512
513        // Step 1: heal file-level inconsistencies (no pruning)
514        factory.static_file_provider().check_file_consistency(&provider_ro)?;
515
516        // Step 2: RocksDB consistency check (needs static files tx data)
517        let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?;
518
519        // Step 3: Static file checkpoint consistency (may prune)
520        let static_file_unwind = factory
521            .static_file_provider()
522            .check_consistency(&provider_ro)?
523            .map(|target| match target {
524                PipelineTarget::Unwind(block) => block,
525                PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
526            });
527
528        // Take the minimum block number to ensure all storage layers are consistent.
529        let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
530
531        if let Some(unwind_block) = unwind_target {
532            // Highly unlikely to happen, and given its destructive nature, it's better to panic
533            // instead. Unwinding to 0 would leave MDBX with a huge free list size.
534            let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
535                (Some(_), Some(_)) => "RocksDB and static file",
536                (Some(_), None) => "RocksDB",
537                (None, Some(_)) => "static file",
538                (None, None) => unreachable!(),
539            };
540            assert_ne!(
541                unwind_block, 0,
542                "A {} inconsistency was found that would trigger an unwind to block 0",
543                inconsistency_source
544            );
545
546            let unwind_target = PipelineTarget::Unwind(unwind_block);
547
548            info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
549
550            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
551
552            // Builds an unwind-only pipeline
553            let pipeline = PipelineBuilder::default()
554                .add_stages(DefaultStages::new(
555                    factory.clone(),
556                    tip_rx,
557                    Arc::new(NoopConsensus::default()),
558                    NoopHeaderDownloader::default(),
559                    NoopBodiesDownloader::default(),
560                    NoopEvmConfig::<Evm>::default(),
561                    self.toml_config().stages.clone(),
562                    self.prune_modes(),
563                    None,
564                ))
565                .build(
566                    factory.clone(),
567                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
568                );
569
570            // Unwinds to block
571            let (tx, rx) = oneshot::channel();
572
573            // Pipeline should be run as blocking and panic if it fails.
574            self.task_executor().spawn_critical_blocking(
575                "pipeline task",
576                Box::pin(async move {
577                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
578                    let _ = tx.send(result);
579                }),
580            );
581            rx.await?.inspect_err(|err| {
582                error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
583            })?;
584        }
585
586        Ok(factory)
587    }
588
589    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
590    pub async fn with_provider_factory<N, Evm>(
591        self,
592    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
593    where
594        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
595        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
596    {
597        let factory = self.create_provider_factory::<N, Evm>().await?;
598        let ctx = LaunchContextWith {
599            inner: self.inner,
600            attachment: self.attachment.map_right(|_| factory),
601        };
602
603        Ok(ctx)
604    }
605}
606
607impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
608where
609    T: ProviderNodeTypes,
610{
611    /// Returns access to the underlying database.
612    pub const fn database(&self) -> &T::DB {
613        self.right().db_ref()
614    }
615
616    /// Returns the configured `ProviderFactory`.
617    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
618        self.right()
619    }
620
621    /// Returns the static file provider to interact with the static files.
622    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
623        self.right().static_file_provider()
624    }
625
626    /// This launches the prometheus endpoint.
627    ///
628    /// Convenience function to [`Self::start_prometheus_endpoint`]
629    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
630        self.start_prometheus_endpoint().await?;
631        Ok(self)
632    }
633
634    /// Starts the prometheus endpoint.
635    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
636        // ensure recorder runs upkeep periodically
637        install_prometheus_recorder().spawn_upkeep();
638
639        let listen_addr = self.node_config().metrics.prometheus;
640        if let Some(addr) = listen_addr {
641            let config = MetricServerConfig::new(
642                addr,
643                VersionInfo {
644                    version: version_metadata().cargo_pkg_version.as_ref(),
645                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
646                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
647                    git_sha: version_metadata().vergen_git_sha.as_ref(),
648                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
649                    build_profile: version_metadata().build_profile_name.as_ref(),
650                },
651                ChainSpecInfo { name: self.chain_id().to_string() },
652                self.task_executor().clone(),
653                Hooks::builder()
654                    .with_hook({
655                        let db = self.database().clone();
656                        move || db.report_metrics()
657                    })
658                    .with_hook({
659                        let sfp = self.static_file_provider();
660                        move || {
661                            if let Err(error) = sfp.report_metrics() {
662                                error!(%error, "Failed to report metrics for the static file provider");
663                            }
664                        }
665                    })
666                    .build(),
667            ).with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
668
669            MetricServer::new(config).serve().await?;
670        }
671
672        Ok(())
673    }
674
675    /// Convenience function to [`Self::init_genesis`]
676    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
677        init_genesis_with_settings(
678            self.provider_factory(),
679            self.node_config().static_files.to_settings(),
680        )?;
681        Ok(self)
682    }
683
684    /// Write the genesis block and state if it has not already been written
685    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
686        init_genesis_with_settings(
687            self.provider_factory(),
688            self.node_config().static_files.to_settings(),
689        )
690    }
691
692    /// Creates a new `WithMeteredProvider` container and attaches it to the
693    /// launch context.
694    ///
695    /// This spawns a metrics task that listens for metrics related events and updates metrics for
696    /// prometheus.
697    pub fn with_metrics_task(
698        self,
699    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
700        let (metrics_sender, metrics_receiver) = unbounded_channel();
701
702        let with_metrics =
703            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
704
705        debug!(target: "reth::cli", "Spawning stages metrics listener task");
706        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
707        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
708
709        LaunchContextWith {
710            inner: self.inner,
711            attachment: self.attachment.map_right(|_| with_metrics),
712        }
713    }
714}
715
716impl<N, DB>
717    LaunchContextWith<
718        Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
719    >
720where
721    N: NodeTypes,
722    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
723{
724    /// Returns the configured `ProviderFactory`.
725    const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
726        &self.right().provider_factory
727    }
728
729    /// Returns the metrics sender.
730    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
731        self.right().metrics_sender.clone()
732    }
733
734    /// Creates a `BlockchainProvider` and attaches it to the launch context.
735    #[expect(clippy::complexity)]
736    pub fn with_blockchain_db<T, F>(
737        self,
738        create_blockchain_provider: F,
739    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
740    where
741        T: FullNodeTypes<Types = N, DB = DB>,
742        F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
743    {
744        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
745
746        let metered_providers = WithMeteredProviders {
747            db_provider_container: WithMeteredProvider {
748                provider_factory: self.provider_factory().clone(),
749                metrics_sender: self.sync_metrics_tx(),
750            },
751            blockchain_db,
752        };
753
754        let ctx = LaunchContextWith {
755            inner: self.inner,
756            attachment: self.attachment.map_right(|_| metered_providers),
757        };
758
759        Ok(ctx)
760    }
761}
762
763impl<T>
764    LaunchContextWith<
765        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
766    >
767where
768    T: FullNodeTypes<Types: NodeTypesForProvider>,
769{
770    /// Returns access to the underlying database.
771    pub const fn database(&self) -> &T::DB {
772        self.provider_factory().db_ref()
773    }
774
775    /// Returns the configured `ProviderFactory`.
776    pub const fn provider_factory(
777        &self,
778    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
779        &self.right().db_provider_container.provider_factory
780    }
781
782    /// Fetches the head block from the database.
783    ///
784    /// If the database is empty, returns the genesis block.
785    pub fn lookup_head(&self) -> eyre::Result<Head> {
786        self.node_config()
787            .lookup_head(self.provider_factory())
788            .wrap_err("the head block is missing")
789    }
790
791    /// Returns the metrics sender.
792    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
793        self.right().db_provider_container.metrics_sender.clone()
794    }
795
796    /// Returns a reference to the blockchain provider.
797    pub const fn blockchain_db(&self) -> &T::Provider {
798        &self.right().blockchain_db
799    }
800
801    /// Creates a `NodeAdapter` and attaches it to the launch context.
802    pub async fn with_components<CB>(
803        self,
804        components_builder: CB,
805        on_component_initialized: Box<
806            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
807        >,
808    ) -> eyre::Result<
809        LaunchContextWith<
810            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
811        >,
812    >
813    where
814        CB: NodeComponentsBuilder<T>,
815    {
816        // fetch the head block from the database
817        let head = self.lookup_head()?;
818
819        let builder_ctx = BuilderContext::new(
820            head,
821            self.blockchain_db().clone(),
822            self.task_executor().clone(),
823            self.configs().clone(),
824        );
825
826        debug!(target: "reth::cli", "creating components");
827        let components = components_builder.build_components(&builder_ctx).await?;
828
829        let blockchain_db = self.blockchain_db().clone();
830
831        let node_adapter = NodeAdapter {
832            components,
833            task_executor: self.task_executor().clone(),
834            provider: blockchain_db,
835        };
836
837        debug!(target: "reth::cli", "calling on_component_initialized hook");
838        on_component_initialized.on_event(node_adapter.clone())?;
839
840        let components_container = WithComponents {
841            db_provider_container: WithMeteredProvider {
842                provider_factory: self.provider_factory().clone(),
843                metrics_sender: self.sync_metrics_tx(),
844            },
845            node_adapter,
846            head,
847        };
848
849        let ctx = LaunchContextWith {
850            inner: self.inner,
851            attachment: self.attachment.map_right(|_| components_container),
852        };
853
854        Ok(ctx)
855    }
856}
857
858impl<T, CB>
859    LaunchContextWith<
860        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
861    >
862where
863    T: FullNodeTypes<Types: NodeTypesForProvider>,
864    CB: NodeComponentsBuilder<T>,
865{
866    /// Returns the configured `ProviderFactory`.
867    pub const fn provider_factory(
868        &self,
869    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
870        &self.right().db_provider_container.provider_factory
871    }
872
873    /// Returns the max block that the node should run to, looking it up from the network if
874    /// necessary
875    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
876    where
877        C: HeadersClient<Header: BlockHeader>,
878    {
879        self.node_config().max_block(client, self.provider_factory().clone()).await
880    }
881
882    /// Returns the static file provider to interact with the static files.
883    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
884        self.provider_factory().static_file_provider()
885    }
886
887    /// Creates a new [`StaticFileProducer`] with the attached database.
888    pub fn static_file_producer(
889        &self,
890    ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
891        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
892    }
893
894    /// Returns the current head block.
895    pub const fn head(&self) -> Head {
896        self.right().head
897    }
898
899    /// Returns the configured `NodeAdapter`.
900    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
901        &self.right().node_adapter
902    }
903
904    /// Returns mutable reference to the configured `NodeAdapter`.
905    pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
906        &mut self.right_mut().node_adapter
907    }
908
909    /// Returns a reference to the blockchain provider.
910    pub const fn blockchain_db(&self) -> &T::Provider {
911        &self.node_adapter().provider
912    }
913
914    /// Returns the initial backfill to sync to at launch.
915    ///
916    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
917    /// previously interrupted and returns the block hash of the last checkpoint, see also
918    /// [`Self::check_pipeline_consistency`]
919    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
920        let mut initial_target = self.node_config().debug.tip;
921
922        if initial_target.is_none() {
923            initial_target = self.check_pipeline_consistency()?;
924        }
925
926        Ok(initial_target)
927    }
928
929    /// Returns true if the node should terminate after the initial backfill run.
930    ///
931    /// This is the case if any of these configs are set:
932    ///  `--debug.max-block`
933    ///  `--debug.terminate`
934    pub const fn terminate_after_initial_backfill(&self) -> bool {
935        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
936    }
937
938    /// Ensures that the database matches chain-specific requirements.
939    ///
940    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
941    /// bedrock height)
942    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
943        if self.chain_spec().is_optimism() &&
944            !self.is_dev() &&
945            self.chain_id() == Chain::optimism_mainnet()
946        {
947            let latest = self.blockchain_db().last_block_number()?;
948            // bedrock height
949            if latest < 105235063 {
950                error!(
951                    "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
952                );
953                return Err(ProviderError::BestBlockNotFound)
954            }
955        }
956
957        Ok(())
958    }
959
960    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
961    /// than the checkpoint of the first stage).
962    ///
963    /// This will return the pipeline target if:
964    ///  * the pipeline was interrupted during its previous run
965    ///  * a new stage was added
966    ///  * stage data was dropped manually through `reth stage drop ...`
967    ///
968    /// # Returns
969    ///
970    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
971    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
972        // We skip the era stage if it's not enabled
973        let era_enabled = self.era_import_source().is_some();
974        let mut all_stages =
975            StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
976
977        // Get the expected first stage based on config.
978        let first_stage = all_stages.next().expect("there must be at least one stage");
979
980        // If no target was provided, check if the stages are congruent - check if the
981        // checkpoint of the last stage matches the checkpoint of the first.
982        let first_stage_checkpoint = self
983            .blockchain_db()
984            .get_stage_checkpoint(first_stage)?
985            .unwrap_or_default()
986            .block_number;
987
988        // Compare all other stages against the first
989        for stage_id in all_stages {
990            let stage_checkpoint = self
991                .blockchain_db()
992                .get_stage_checkpoint(stage_id)?
993                .unwrap_or_default()
994                .block_number;
995
996            // If the checkpoint of any stage is less than the checkpoint of the first stage,
997            // retrieve and return the block hash of the latest header and use it as the target.
998            debug!(
999                target: "consensus::engine",
1000                first_stage_id = %first_stage,
1001                first_stage_checkpoint,
1002                stage_id = %stage_id,
1003                stage_checkpoint = stage_checkpoint,
1004                "Checking stage against first stage",
1005            );
1006            if stage_checkpoint < first_stage_checkpoint {
1007                debug!(
1008                    target: "consensus::engine",
1009                    first_stage_id = %first_stage,
1010                    first_stage_checkpoint,
1011                    inconsistent_stage_id = %stage_id,
1012                    inconsistent_stage_checkpoint = stage_checkpoint,
1013                    "Pipeline sync progress is inconsistent"
1014                );
1015                return self.blockchain_db().block_hash(first_stage_checkpoint);
1016            }
1017        }
1018
1019        self.ensure_chain_specific_db_checks()?;
1020
1021        Ok(None)
1022    }
1023
1024    /// Returns the metrics sender.
1025    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
1026        self.right().db_provider_container.metrics_sender.clone()
1027    }
1028
1029    /// Returns the node adapter components.
1030    pub const fn components(&self) -> &CB::Components {
1031        &self.node_adapter().components
1032    }
1033
1034    /// Launches ExEx (Execution Extensions) and returns the ExEx manager handle.
1035    #[allow(clippy::type_complexity)]
1036    pub async fn launch_exex(
1037        &self,
1038        installed_exex: Vec<(
1039            String,
1040            Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1041        )>,
1042    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1043        ExExLauncher::new(
1044            self.head(),
1045            self.node_adapter().clone(),
1046            installed_exex,
1047            self.configs().clone(),
1048        )
1049        .launch()
1050        .await
1051    }
1052
1053    /// Creates the ERA import source based on node configuration.
1054    ///
1055    /// Returns `Some(EraImportSource)` if ERA is enabled in the node config, otherwise `None`.
1056    pub fn era_import_source(&self) -> Option<EraImportSource> {
1057        let node_config = self.node_config();
1058        if !node_config.era.enabled {
1059            return None;
1060        }
1061
1062        EraImportSource::maybe_new(
1063            node_config.era.source.path.clone(),
1064            node_config.era.source.url.clone(),
1065            || node_config.chain.chain().kind().default_era_host(),
1066            || node_config.datadir().data_dir().join("era").into(),
1067        )
1068    }
1069
1070    /// Creates consensus layer health events stream based on node configuration.
1071    ///
1072    /// Returns a stream that monitors consensus layer health if:
1073    /// - No debug tip is configured
1074    /// - Not running in dev mode
1075    ///
1076    /// Otherwise returns an empty stream.
1077    pub fn consensus_layer_events(
1078        &self,
1079    ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1080    where
1081        T::Provider: reth_provider::CanonChainTracker,
1082    {
1083        if self.node_config().debug.tip.is_none() && !self.is_dev() {
1084            Either::Left(
1085                ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1086                    .map(Into::into),
1087            )
1088        } else {
1089            Either::Right(stream::empty())
1090        }
1091    }
1092
1093    /// Spawns the [`EthStatsService`] service if configured.
1094    pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1095    where
1096        St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1097            + Send
1098            + Unpin
1099            + 'static,
1100    {
1101        let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1102
1103        let network = self.components().network().clone();
1104        let pool = self.components().pool().clone();
1105        let provider = self.node_adapter().provider.clone();
1106
1107        info!(target: "reth::cli", "Starting EthStats service at {}", url);
1108
1109        let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1110
1111        // If engine events are provided, spawn listener for new payload reporting
1112        let ethstats_for_events = ethstats.clone();
1113        let task_executor = self.task_executor().clone();
1114        task_executor.spawn(Box::pin(async move {
1115            while let Some(event) = engine_events.next().await {
1116                use reth_engine_primitives::ConsensusEngineEvent;
1117                match event {
1118                    ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1119                    ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1120                        let block_hash = executed.recovered_block.num_hash().hash;
1121                        let block_number = executed.recovered_block.num_hash().number;
1122                        if let Err(e) = ethstats_for_events
1123                            .report_new_payload(block_hash, block_number, duration)
1124                            .await
1125                        {
1126                            debug!(
1127                                target: "ethstats",
1128                                "Failed to report new payload: {}", e
1129                            );
1130                        }
1131                    }
1132                    _ => {
1133                        // Ignore other event types for ethstats reporting
1134                    }
1135                }
1136            }
1137        }));
1138
1139        // Spawn main ethstats service
1140        task_executor.spawn(Box::pin(async move { ethstats.run().await }));
1141
1142        Ok(())
1143    }
1144}
1145
1146/// Joins two attachments together, preserving access to both values.
1147///
1148/// This type enables the launch process to accumulate state while maintaining
1149/// access to all previously attached components. The `left` field holds the
1150/// previous state, while `right` holds the newly attached component.
1151#[derive(Clone, Copy, Debug)]
1152pub struct Attached<L, R> {
1153    left: L,
1154    right: R,
1155}
1156
1157impl<L, R> Attached<L, R> {
1158    /// Creates a new `Attached` with the given values.
1159    pub const fn new(left: L, right: R) -> Self {
1160        Self { left, right }
1161    }
1162
1163    /// Maps the left value to a new value.
1164    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1165    where
1166        F: FnOnce(L) -> T,
1167    {
1168        Attached::new(f(self.left), self.right)
1169    }
1170
1171    /// Maps the right value to a new value.
1172    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1173    where
1174        F: FnOnce(R) -> T,
1175    {
1176        Attached::new(self.left, f(self.right))
1177    }
1178
1179    /// Get a reference to the left value.
1180    pub const fn left(&self) -> &L {
1181        &self.left
1182    }
1183
1184    /// Get a reference to the right value.
1185    pub const fn right(&self) -> &R {
1186        &self.right
1187    }
1188
1189    /// Get a mutable reference to the left value.
1190    pub const fn left_mut(&mut self) -> &mut L {
1191        &mut self.left
1192    }
1193
1194    /// Get a mutable reference to the right value.
1195    pub const fn right_mut(&mut self) -> &mut R {
1196        &mut self.right
1197    }
1198}
1199
1200/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
1201/// reth.toml config
1202#[derive(Debug)]
1203pub struct WithConfigs<ChainSpec> {
1204    /// The configured, usually derived from the CLI.
1205    pub config: NodeConfig<ChainSpec>,
1206    /// The loaded reth.toml config.
1207    pub toml_config: reth_config::Config,
1208}
1209
1210impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1211    fn clone(&self) -> Self {
1212        Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1213    }
1214}
1215
1216/// Helper container type to bundle the [`ProviderFactory`] and the metrics
1217/// sender.
1218#[derive(Debug, Clone)]
1219pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1220    provider_factory: ProviderFactory<N>,
1221    metrics_sender: UnboundedSender<MetricEvent>,
1222}
1223
1224/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1225/// and a metrics sender.
1226#[expect(missing_debug_implementations)]
1227pub struct WithMeteredProviders<T>
1228where
1229    T: FullNodeTypes,
1230{
1231    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1232    blockchain_db: T::Provider,
1233}
1234
1235/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1236#[expect(missing_debug_implementations)]
1237pub struct WithComponents<T, CB>
1238where
1239    T: FullNodeTypes,
1240    CB: NodeComponentsBuilder<T>,
1241{
1242    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1243    node_adapter: NodeAdapter<T, CB::Components>,
1244    head: Head,
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use super::{LaunchContext, NodeConfig};
1250    use reth_config::Config;
1251    use reth_node_core::args::PruningArgs;
1252
1253    const EXTENSION: &str = "toml";
1254
1255    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1256        let temp_dir = tempfile::tempdir().unwrap();
1257        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1258        proc(&config_path);
1259        temp_dir.close().unwrap()
1260    }
1261
1262    #[test]
1263    fn test_save_prune_config() {
1264        with_tempdir("prune-store-test", |config_path| {
1265            let mut reth_config = Config::default();
1266            let node_config = NodeConfig {
1267                pruning: PruningArgs {
1268                    full: true,
1269                    block_interval: None,
1270                    sender_recovery_full: false,
1271                    sender_recovery_distance: None,
1272                    sender_recovery_before: None,
1273                    transaction_lookup_full: false,
1274                    transaction_lookup_distance: None,
1275                    transaction_lookup_before: None,
1276                    receipts_full: false,
1277                    receipts_pre_merge: false,
1278                    receipts_distance: None,
1279                    receipts_before: None,
1280                    account_history_full: false,
1281                    account_history_distance: None,
1282                    account_history_before: None,
1283                    storage_history_full: false,
1284                    storage_history_distance: None,
1285                    storage_history_before: None,
1286                    bodies_pre_merge: false,
1287                    bodies_distance: None,
1288                    receipts_log_filter: None,
1289                    bodies_before: None,
1290                },
1291                ..NodeConfig::test()
1292            };
1293            LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1294                .unwrap();
1295
1296            let loaded_config = Config::from_path(config_path).unwrap();
1297
1298            assert_eq!(reth_config, loaded_config);
1299        })
1300    }
1301}