reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2//!
3//! ## Launch Context Type System
4//!
5//! The node launch process uses a type-state pattern to ensure correct initialization
6//! order at compile time. Methods are only available when their prerequisites are met.
7//!
8//! ### Core Types
9//!
10//! - [`LaunchContext`]: Base context with executor and data directory
11//! - [`LaunchContextWith<T>`]: Context with an attached value of type `T`
12//! - [`Attached<L, R>`]: Pairs values, preserving both previous (L) and new (R) state
13//!
14//! ### Helper Attachments
15//!
16//! - [`WithConfigs`]: Node config + TOML config
17//! - [`WithMeteredProvider`]: Provider factory with metrics
18//! - [`WithMeteredProviders`]: Provider factory + blockchain provider
19//! - [`WithComponents`]: Final form with all components
20//!
21//! ### Method Availability
22//!
23//! Methods are implemented on specific type combinations:
24//! - `impl<T> LaunchContextWith<T>`: Generic methods available for any attachment
25//! - `impl LaunchContextWith<WithConfigs>`: Config-specific methods
26//! - `impl LaunchContextWith<Attached<WithConfigs, DB>>`: Database operations
27//! - `impl LaunchContextWith<Attached<WithConfigs, ProviderFactory>>`: Provider operations
28//! - etc.
29//!
30//! This ensures correct initialization order without runtime checks.
31
32use crate::{
33    components::{NodeComponents, NodeComponentsBuilder},
34    hooks::OnComponentInitializedHook,
35    BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_consensus::BlockHeader as _;
38use alloy_eips::eip2124::Head;
39use alloy_primitives::{BlockNumber, B256};
40use eyre::Context;
41use rayon::ThreadPoolBuilder;
42use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
43use reth_config::{config::EtlConfig, PruneConfig};
44use reth_consensus::noop::NoopConsensus;
45use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
46use reth_db_common::init::{init_genesis, InitStorageError};
47use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
48use reth_engine_local::MiningMode;
49use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
50use reth_exex::ExExManagerHandle;
51use reth_fs_util as fs;
52use reth_network_p2p::headers::client::HeadersClient;
53use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
54use reth_node_core::{
55    args::DefaultEraHost,
56    dirs::{ChainPath, DataDirPath},
57    node_config::NodeConfig,
58    primitives::BlockHeader,
59    version::version_metadata,
60};
61use reth_node_metrics::{
62    chain::ChainSpecInfo,
63    hooks::Hooks,
64    recorder::install_prometheus_recorder,
65    server::{MetricServer, MetricServerConfig},
66    version::VersionInfo,
67};
68use reth_provider::{
69    providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
70    BlockHashReader, BlockNumReader, BlockReaderIdExt, ChainSpecProvider, ProviderError,
71    ProviderFactory, ProviderResult, StageCheckpointReader, StateProviderFactory,
72    StaticFileProviderFactory,
73};
74use reth_prune::{PruneModes, PrunerBuilder};
75use reth_rpc_builder::config::RethRpcServerConfig;
76use reth_rpc_layer::JwtSecret;
77use reth_stages::{
78    sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
79    StageId,
80};
81use reth_static_file::StaticFileProducer;
82use reth_tasks::TaskExecutor;
83use reth_tracing::tracing::{debug, error, info, warn};
84use reth_transaction_pool::TransactionPool;
85use std::{sync::Arc, thread::available_parallelism};
86use tokio::sync::{
87    mpsc::{unbounded_channel, UnboundedSender},
88    oneshot, watch,
89};
90
91use futures::{future::Either, stream, Stream, StreamExt};
92use reth_node_ethstats::EthStatsService;
93use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
94
95/// Reusable setup for launching a node.
96///
97/// This is the entry point for the node launch process. It implements a builder
98/// pattern using type-state programming to enforce correct initialization order.
99///
100/// ## Type Evolution
101///
102/// Starting from `LaunchContext`, each method transforms the type to reflect
103/// accumulated state:
104///
105/// ```text
106/// LaunchContext
107///   └─> LaunchContextWith<WithConfigs>
108///       └─> LaunchContextWith<Attached<WithConfigs, DB>>
109///           └─> LaunchContextWith<Attached<WithConfigs, ProviderFactory>>
110///               └─> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders>>
111///                   └─> LaunchContextWith<Attached<WithConfigs, WithComponents>>
112/// ```
113#[derive(Debug, Clone)]
114pub struct LaunchContext {
115    /// The task executor for the node.
116    pub task_executor: TaskExecutor,
117    /// The data directory for the node.
118    pub data_dir: ChainPath<DataDirPath>,
119}
120
121impl LaunchContext {
122    /// Create a new instance of the default node launcher.
123    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
124        Self { task_executor, data_dir }
125    }
126
127    /// Create launch context with attachment.
128    pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
129        LaunchContextWith { inner: self, attachment }
130    }
131
132    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
133    /// `config`.
134    ///
135    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
136    pub fn with_loaded_toml_config<ChainSpec>(
137        self,
138        config: NodeConfig<ChainSpec>,
139    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
140    where
141        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
142    {
143        let toml_config = self.load_toml_config(&config)?;
144        Ok(self.with(WithConfigs { config, toml_config }))
145    }
146
147    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
148    /// `config`.
149    ///
150    /// This is async because the trusted peers may have to be resolved.
151    pub fn load_toml_config<ChainSpec>(
152        &self,
153        config: &NodeConfig<ChainSpec>,
154    ) -> eyre::Result<reth_config::Config>
155    where
156        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
157    {
158        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
159
160        let mut toml_config = reth_config::Config::from_path(&config_path)
161            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
162
163        Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
164
165        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
166
167        // Update the config with the command line arguments
168        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
169
170        Ok(toml_config)
171    }
172
173    /// Save prune config to the toml file if node is a full node.
174    fn save_pruning_config_if_full_node<ChainSpec>(
175        reth_config: &mut reth_config::Config,
176        config: &NodeConfig<ChainSpec>,
177        config_path: impl AsRef<std::path::Path>,
178    ) -> eyre::Result<()>
179    where
180        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
181    {
182        if reth_config.prune.is_none() {
183            if let Some(prune_config) = config.prune_config() {
184                reth_config.update_prune_config(prune_config);
185                info!(target: "reth::cli", "Saving prune config to toml file");
186                reth_config.save(config_path.as_ref())?;
187            }
188        } else if config.prune_config().is_none() {
189            warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
190        }
191        Ok(())
192    }
193
194    /// Convenience function to [`Self::configure_globals`]
195    pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
196        self.configure_globals(reserved_cpu_cores);
197        self
198    }
199
200    /// Configure global settings this includes:
201    ///
202    /// - Raising the file descriptor limit
203    /// - Configuring the global rayon thread pool with available parallelism. Honoring
204    ///   engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
205    ///   core for the rayon thread pool
206    pub fn configure_globals(&self, reserved_cpu_cores: usize) {
207        // Raise the fd limit of the process.
208        // Does not do anything on windows.
209        match fdlimit::raise_fd_limit() {
210            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
211                debug!(from, to, "Raised file descriptor limit");
212            }
213            Ok(fdlimit::Outcome::Unsupported) => {}
214            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
215        }
216
217        // Reserving the given number of CPU cores for the rest of OS.
218        // Users can reserve more cores by setting engine.reserved-cpu-cores
219        // Note: The global rayon thread pool will use at least one core.
220        let num_threads = available_parallelism()
221            .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
222        if let Err(err) = ThreadPoolBuilder::new()
223            .num_threads(num_threads)
224            .thread_name(|i| format!("reth-rayon-{i}"))
225            .build_global()
226        {
227            warn!(%err, "Failed to build global thread pool")
228        }
229    }
230}
231
232/// A [`LaunchContext`] along with an additional value.
233///
234/// The type parameter `T` represents the current state of the launch process.
235/// Methods are conditionally implemented based on `T`, ensuring operations
236/// are only available when their prerequisites are met.
237///
238/// For example:
239/// - Config methods when `T = WithConfigs<ChainSpec>`
240/// - Database operations when `T = Attached<WithConfigs<ChainSpec>, DB>`
241/// - Provider operations when `T = Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>`
242#[derive(Debug, Clone)]
243pub struct LaunchContextWith<T> {
244    /// The wrapped launch context.
245    pub inner: LaunchContext,
246    /// The additional attached value.
247    pub attachment: T,
248}
249
250impl<T> LaunchContextWith<T> {
251    /// Configure global settings this includes:
252    ///
253    /// - Raising the file descriptor limit
254    /// - Configuring the global rayon thread pool
255    pub fn configure_globals(&self, reserved_cpu_cores: u64) {
256        self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
257    }
258
259    /// Returns the data directory.
260    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
261        &self.inner.data_dir
262    }
263
264    /// Returns the task executor.
265    pub const fn task_executor(&self) -> &TaskExecutor {
266        &self.inner.task_executor
267    }
268
269    /// Attaches another value to the launch context.
270    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
271        LaunchContextWith {
272            inner: self.inner,
273            attachment: Attached::new(self.attachment, attachment),
274        }
275    }
276
277    /// Consumes the type and calls a function with a reference to the context.
278    // Returns the context again
279    pub fn inspect<F>(self, f: F) -> Self
280    where
281        F: FnOnce(&Self),
282    {
283        f(&self);
284        self
285    }
286}
287
288impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
289    /// Resolves the trusted peers and adds them to the toml config.
290    pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
291        if !self.attachment.config.network.trusted_peers.is_empty() {
292            info!(target: "reth::cli", "Adding trusted nodes");
293
294            self.attachment
295                .toml_config
296                .peers
297                .trusted_nodes
298                .extend(self.attachment.config.network.trusted_peers.clone());
299        }
300        Ok(self)
301    }
302}
303
304impl<L, R> LaunchContextWith<Attached<L, R>> {
305    /// Get a reference to the left value.
306    pub const fn left(&self) -> &L {
307        &self.attachment.left
308    }
309
310    /// Get a reference to the right value.
311    pub const fn right(&self) -> &R {
312        &self.attachment.right
313    }
314
315    /// Get a mutable reference to the left value.
316    pub const fn left_mut(&mut self) -> &mut L {
317        &mut self.attachment.left
318    }
319
320    /// Get a mutable reference to the right value.
321    pub const fn right_mut(&mut self) -> &mut R {
322        &mut self.attachment.right
323    }
324}
325impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
326    /// Adjust certain settings in the config to make sure they are set correctly
327    ///
328    /// This includes:
329    /// - Making sure the ETL dir is set to the datadir
330    /// - RPC settings are adjusted to the correct port
331    pub fn with_adjusted_configs(self) -> Self {
332        self.ensure_etl_datadir().with_adjusted_instance_ports()
333    }
334
335    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
336    pub fn ensure_etl_datadir(mut self) -> Self {
337        if self.toml_config_mut().stages.etl.dir.is_none() {
338            let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
339            if etl_path.exists() {
340                // Remove etl-path files on launch
341                if let Err(err) = fs::remove_dir_all(&etl_path) {
342                    warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
343                }
344            }
345            self.toml_config_mut().stages.etl.dir = Some(etl_path);
346        }
347
348        self
349    }
350
351    /// Change rpc port numbers based on the instance number.
352    pub fn with_adjusted_instance_ports(mut self) -> Self {
353        self.node_config_mut().adjust_instance_ports();
354        self
355    }
356
357    /// Returns the container for all config types
358    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
359        self.attachment.left()
360    }
361
362    /// Returns the attached [`NodeConfig`].
363    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
364        &self.left().config
365    }
366
367    /// Returns the attached [`NodeConfig`].
368    pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
369        &mut self.left_mut().config
370    }
371
372    /// Returns the attached toml config [`reth_config::Config`].
373    pub const fn toml_config(&self) -> &reth_config::Config {
374        &self.left().toml_config
375    }
376
377    /// Returns the attached toml config [`reth_config::Config`].
378    pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
379        &mut self.left_mut().toml_config
380    }
381
382    /// Returns the configured chain spec.
383    pub fn chain_spec(&self) -> Arc<ChainSpec> {
384        self.node_config().chain.clone()
385    }
386
387    /// Get the hash of the genesis block.
388    pub fn genesis_hash(&self) -> B256 {
389        self.node_config().chain.genesis_hash()
390    }
391
392    /// Returns the chain identifier of the node.
393    pub fn chain_id(&self) -> Chain {
394        self.node_config().chain.chain()
395    }
396
397    /// Returns true if the node is configured as --dev
398    pub const fn is_dev(&self) -> bool {
399        self.node_config().dev.dev
400    }
401
402    /// Returns the configured [`PruneConfig`]
403    ///
404    /// Any configuration set in CLI will take precedence over those set in toml
405    pub fn prune_config(&self) -> Option<PruneConfig>
406    where
407        ChainSpec: reth_chainspec::EthereumHardforks,
408    {
409        let Some(mut node_prune_config) = self.node_config().prune_config() else {
410            // No CLI config is set, use the toml config.
411            return self.toml_config().prune.clone();
412        };
413
414        // Otherwise, use the CLI configuration and merge with toml config.
415        node_prune_config.merge(self.toml_config().prune.clone());
416        Some(node_prune_config)
417    }
418
419    /// Returns the configured [`PruneModes`], returning the default if no config was available.
420    pub fn prune_modes(&self) -> PruneModes
421    where
422        ChainSpec: reth_chainspec::EthereumHardforks,
423    {
424        self.prune_config().map(|config| config.segments).unwrap_or_default()
425    }
426
427    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
428    pub fn pruner_builder(&self) -> PrunerBuilder
429    where
430        ChainSpec: reth_chainspec::EthereumHardforks,
431    {
432        PrunerBuilder::new(self.prune_config().unwrap_or_default())
433            .delete_limit(self.chain_spec().prune_delete_limit())
434            .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
435    }
436
437    /// Loads the JWT secret for the engine API
438    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
439        let default_jwt_path = self.data_dir().jwt();
440        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
441        Ok(secret)
442    }
443
444    /// Returns the [`MiningMode`] intended for --dev mode.
445    pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
446    where
447        Pool: TransactionPool + Unpin,
448    {
449        if let Some(interval) = self.node_config().dev.block_time {
450            MiningMode::interval(interval)
451        } else {
452            MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
453        }
454    }
455}
456
457impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
458where
459    DB: Database + Clone + 'static,
460    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
461{
462    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
463    /// between the database and static files. **It may execute a pipeline unwind if it fails this
464    /// check.**
465    pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
466    where
467        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
468        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
469    {
470        let factory = ProviderFactory::new(
471            self.right().clone(),
472            self.chain_spec(),
473            StaticFileProvider::read_write(self.data_dir().static_files())?,
474        )
475        .with_prune_modes(self.prune_modes())
476        .with_static_files_metrics();
477
478        let has_receipt_pruning =
479            self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
480
481        // Check for consistency between database and static files. If it fails, it unwinds to
482        // the first block that's consistent between database and static files.
483        if let Some(unwind_target) = factory
484            .static_file_provider()
485            .check_consistency(&factory.provider()?, has_receipt_pruning)?
486        {
487            // Highly unlikely to happen, and given its destructive nature, it's better to panic
488            // instead.
489            assert_ne!(
490                unwind_target,
491                PipelineTarget::Unwind(0),
492                "A static file <> database inconsistency was found that would trigger an unwind to block 0"
493            );
494
495            info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
496
497            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
498
499            // Builds an unwind-only pipeline
500            let pipeline = PipelineBuilder::default()
501                .add_stages(DefaultStages::new(
502                    factory.clone(),
503                    tip_rx,
504                    Arc::new(NoopConsensus::default()),
505                    NoopHeaderDownloader::default(),
506                    NoopBodiesDownloader::default(),
507                    NoopEvmConfig::<Evm>::default(),
508                    self.toml_config().stages.clone(),
509                    self.prune_modes(),
510                    None,
511                ))
512                .build(
513                    factory.clone(),
514                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
515                );
516
517            // Unwinds to block
518            let (tx, rx) = oneshot::channel();
519
520            // Pipeline should be run as blocking and panic if it fails.
521            self.task_executor().spawn_critical_blocking(
522                "pipeline task",
523                Box::pin(async move {
524                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
525                    let _ = tx.send(result);
526                }),
527            );
528            rx.await?.inspect_err(|err| {
529                error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
530            })?;
531        }
532
533        Ok(factory)
534    }
535
536    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
537    pub async fn with_provider_factory<N, Evm>(
538        self,
539    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
540    where
541        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
542        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
543    {
544        let factory = self.create_provider_factory::<N, Evm>().await?;
545        let ctx = LaunchContextWith {
546            inner: self.inner,
547            attachment: self.attachment.map_right(|_| factory),
548        };
549
550        Ok(ctx)
551    }
552}
553
554impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
555where
556    T: ProviderNodeTypes,
557{
558    /// Returns access to the underlying database.
559    pub const fn database(&self) -> &T::DB {
560        self.right().db_ref()
561    }
562
563    /// Returns the configured `ProviderFactory`.
564    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
565        self.right()
566    }
567
568    /// Returns the static file provider to interact with the static files.
569    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
570        self.right().static_file_provider()
571    }
572
573    /// This launches the prometheus endpoint.
574    ///
575    /// Convenience function to [`Self::start_prometheus_endpoint`]
576    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
577        self.start_prometheus_endpoint().await?;
578        Ok(self)
579    }
580
581    /// Starts the prometheus endpoint.
582    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
583        // ensure recorder runs upkeep periodically
584        install_prometheus_recorder().spawn_upkeep();
585
586        let listen_addr = self.node_config().metrics;
587        if let Some(addr) = listen_addr {
588            info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
589            let config = MetricServerConfig::new(
590                addr,
591                VersionInfo {
592                    version: version_metadata().cargo_pkg_version.as_ref(),
593                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
594                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
595                    git_sha: version_metadata().vergen_git_sha.as_ref(),
596                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
597                    build_profile: version_metadata().build_profile_name.as_ref(),
598                },
599                ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
600                self.task_executor().clone(),
601                Hooks::builder()
602                    .with_hook({
603                        let db = self.database().clone();
604                        move || db.report_metrics()
605                    })
606                    .with_hook({
607                        let sfp = self.static_file_provider();
608                        move || {
609                            if let Err(error) = sfp.report_metrics() {
610                                error!(%error, "Failed to report metrics for the static file provider");
611                            }
612                        }
613                    })
614                    .build(),
615            );
616
617            MetricServer::new(config).serve().await?;
618        }
619
620        Ok(())
621    }
622
623    /// Convenience function to [`Self::init_genesis`]
624    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
625        init_genesis(self.provider_factory())?;
626        Ok(self)
627    }
628
629    /// Write the genesis block and state if it has not already been written
630    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
631        init_genesis(self.provider_factory())
632    }
633
634    /// Creates a new `WithMeteredProvider` container and attaches it to the
635    /// launch context.
636    ///
637    /// This spawns a metrics task that listens for metrics related events and updates metrics for
638    /// prometheus.
639    pub fn with_metrics_task(
640        self,
641    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
642        let (metrics_sender, metrics_receiver) = unbounded_channel();
643
644        let with_metrics =
645            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
646
647        debug!(target: "reth::cli", "Spawning stages metrics listener task");
648        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
649        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
650
651        LaunchContextWith {
652            inner: self.inner,
653            attachment: self.attachment.map_right(|_| with_metrics),
654        }
655    }
656}
657
658impl<N, DB>
659    LaunchContextWith<
660        Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
661    >
662where
663    N: NodeTypes,
664    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
665{
666    /// Returns the configured `ProviderFactory`.
667    const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
668        &self.right().provider_factory
669    }
670
671    /// Returns the metrics sender.
672    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
673        self.right().metrics_sender.clone()
674    }
675
676    /// Creates a `BlockchainProvider` and attaches it to the launch context.
677    #[expect(clippy::complexity)]
678    pub fn with_blockchain_db<T, F>(
679        self,
680        create_blockchain_provider: F,
681    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
682    where
683        T: FullNodeTypes<Types = N, DB = DB>,
684        F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
685    {
686        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
687
688        let metered_providers = WithMeteredProviders {
689            db_provider_container: WithMeteredProvider {
690                provider_factory: self.provider_factory().clone(),
691                metrics_sender: self.sync_metrics_tx(),
692            },
693            blockchain_db,
694        };
695
696        let ctx = LaunchContextWith {
697            inner: self.inner,
698            attachment: self.attachment.map_right(|_| metered_providers),
699        };
700
701        Ok(ctx)
702    }
703}
704
705impl<T>
706    LaunchContextWith<
707        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
708    >
709where
710    T: FullNodeTypes<Types: NodeTypesForProvider>,
711{
712    /// Returns access to the underlying database.
713    pub const fn database(&self) -> &T::DB {
714        self.provider_factory().db_ref()
715    }
716
717    /// Returns the configured `ProviderFactory`.
718    pub const fn provider_factory(
719        &self,
720    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
721        &self.right().db_provider_container.provider_factory
722    }
723
724    /// Fetches the head block from the database.
725    ///
726    /// If the database is empty, returns the genesis block.
727    pub fn lookup_head(&self) -> eyre::Result<Head> {
728        self.node_config()
729            .lookup_head(self.provider_factory())
730            .wrap_err("the head block is missing")
731    }
732
733    /// Returns the metrics sender.
734    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
735        self.right().db_provider_container.metrics_sender.clone()
736    }
737
738    /// Returns a reference to the blockchain provider.
739    pub const fn blockchain_db(&self) -> &T::Provider {
740        &self.right().blockchain_db
741    }
742
743    /// Creates a `NodeAdapter` and attaches it to the launch context.
744    pub async fn with_components<CB>(
745        self,
746        components_builder: CB,
747        on_component_initialized: Box<
748            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
749        >,
750    ) -> eyre::Result<
751        LaunchContextWith<
752            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
753        >,
754    >
755    where
756        CB: NodeComponentsBuilder<T>,
757    {
758        // fetch the head block from the database
759        let head = self.lookup_head()?;
760
761        let builder_ctx = BuilderContext::new(
762            head,
763            self.blockchain_db().clone(),
764            self.task_executor().clone(),
765            self.configs().clone(),
766        );
767
768        debug!(target: "reth::cli", "creating components");
769        let components = components_builder.build_components(&builder_ctx).await?;
770
771        let blockchain_db = self.blockchain_db().clone();
772
773        let node_adapter = NodeAdapter {
774            components,
775            task_executor: self.task_executor().clone(),
776            provider: blockchain_db,
777        };
778
779        debug!(target: "reth::cli", "calling on_component_initialized hook");
780        on_component_initialized.on_event(node_adapter.clone())?;
781
782        let components_container = WithComponents {
783            db_provider_container: WithMeteredProvider {
784                provider_factory: self.provider_factory().clone(),
785                metrics_sender: self.sync_metrics_tx(),
786            },
787            node_adapter,
788            head,
789        };
790
791        let ctx = LaunchContextWith {
792            inner: self.inner,
793            attachment: self.attachment.map_right(|_| components_container),
794        };
795
796        Ok(ctx)
797    }
798}
799
800impl<T, CB>
801    LaunchContextWith<
802        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
803    >
804where
805    T: FullNodeTypes<Types: NodeTypesForProvider>,
806    CB: NodeComponentsBuilder<T>,
807{
808    /// Returns the configured `ProviderFactory`.
809    pub const fn provider_factory(
810        &self,
811    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
812        &self.right().db_provider_container.provider_factory
813    }
814
815    /// Returns the max block that the node should run to, looking it up from the network if
816    /// necessary
817    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
818    where
819        C: HeadersClient<Header: BlockHeader>,
820    {
821        self.node_config().max_block(client, self.provider_factory().clone()).await
822    }
823
824    /// Returns the static file provider to interact with the static files.
825    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
826        self.provider_factory().static_file_provider()
827    }
828
829    /// Creates a new [`StaticFileProducer`] with the attached database.
830    pub fn static_file_producer(
831        &self,
832    ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
833        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
834    }
835
836    /// Returns the current head block.
837    pub const fn head(&self) -> Head {
838        self.right().head
839    }
840
841    /// Returns the configured `NodeAdapter`.
842    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
843        &self.right().node_adapter
844    }
845
846    /// Returns mutable reference to the configured `NodeAdapter`.
847    pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
848        &mut self.right_mut().node_adapter
849    }
850
851    /// Returns a reference to the blockchain provider.
852    pub const fn blockchain_db(&self) -> &T::Provider {
853        &self.node_adapter().provider
854    }
855
856    /// Returns the initial backfill to sync to at launch.
857    ///
858    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
859    /// previously interrupted and returns the block hash of the last checkpoint, see also
860    /// [`Self::check_pipeline_consistency`]
861    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
862        let mut initial_target = self.node_config().debug.tip;
863
864        if initial_target.is_none() {
865            initial_target = self.check_pipeline_consistency()?;
866        }
867
868        Ok(initial_target)
869    }
870
871    /// Returns true if the node should terminate after the initial backfill run.
872    ///
873    /// This is the case if any of these configs are set:
874    ///  `--debug.max-block`
875    ///  `--debug.terminate`
876    pub const fn terminate_after_initial_backfill(&self) -> bool {
877        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
878    }
879
880    /// Ensures that the database matches chain-specific requirements.
881    ///
882    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
883    /// bedrock height)
884    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
885        if self.chain_spec().is_optimism() &&
886            !self.is_dev() &&
887            self.chain_id() == Chain::optimism_mainnet()
888        {
889            let latest = self.blockchain_db().last_block_number()?;
890            // bedrock height
891            if latest < 105235063 {
892                error!(
893                    "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
894                );
895                return Err(ProviderError::BestBlockNotFound)
896            }
897        }
898
899        Ok(())
900    }
901
902    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
903    /// than the checkpoint of the first stage).
904    ///
905    /// This will return the pipeline target if:
906    ///  * the pipeline was interrupted during its previous run
907    ///  * a new stage was added
908    ///  * stage data was dropped manually through `reth stage drop ...`
909    ///
910    /// # Returns
911    ///
912    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
913    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
914        // If no target was provided, check if the stages are congruent - check if the
915        // checkpoint of the last stage matches the checkpoint of the first.
916        let first_stage_checkpoint = self
917            .blockchain_db()
918            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
919            .unwrap_or_default()
920            .block_number;
921
922        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
923        // against it.
924        for stage_id in StageId::ALL.iter().skip(1) {
925            let stage_checkpoint = self
926                .blockchain_db()
927                .get_stage_checkpoint(*stage_id)?
928                .unwrap_or_default()
929                .block_number;
930
931            // If the checkpoint of any stage is less than the checkpoint of the first stage,
932            // retrieve and return the block hash of the latest header and use it as the target.
933            if stage_checkpoint < first_stage_checkpoint {
934                debug!(
935                    target: "consensus::engine",
936                    first_stage_checkpoint,
937                    inconsistent_stage_id = %stage_id,
938                    inconsistent_stage_checkpoint = stage_checkpoint,
939                    "Pipeline sync progress is inconsistent"
940                );
941                return self.blockchain_db().block_hash(first_stage_checkpoint);
942            }
943        }
944
945        self.ensure_chain_specific_db_checks()?;
946
947        Ok(None)
948    }
949
950    /// Expire the pre-merge transactions if the node is configured to do so and the chain has a
951    /// merge block.
952    ///
953    /// If the node is configured to prune pre-merge transactions and it has synced past the merge
954    /// block, it will delete the pre-merge transaction static files if they still exist.
955    pub fn expire_pre_merge_transactions(&self) -> eyre::Result<()>
956    where
957        T: FullNodeTypes<Provider: StaticFileProviderFactory>,
958    {
959        if self.node_config().pruning.bodies_pre_merge {
960            if let Some(merge_block) =
961                self.chain_spec().ethereum_fork_activation(EthereumHardfork::Paris).block_number()
962            {
963                // Ensure we only expire transactions after we synced past the merge block.
964                let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
965                if latest.number() > merge_block {
966                    let provider = self.blockchain_db().static_file_provider();
967                    if provider.get_lowest_transaction_static_file_block() < Some(merge_block) {
968                        info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
969                        provider.delete_transactions_below(merge_block)?;
970                    } else {
971                        debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
972                    }
973                }
974            }
975        }
976
977        Ok(())
978    }
979
980    /// Returns the metrics sender.
981    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
982        self.right().db_provider_container.metrics_sender.clone()
983    }
984
985    /// Returns the node adapter components.
986    pub const fn components(&self) -> &CB::Components {
987        &self.node_adapter().components
988    }
989
990    /// Launches ExEx (Execution Extensions) and returns the ExEx manager handle.
991    #[allow(clippy::type_complexity)]
992    pub async fn launch_exex(
993        &self,
994        installed_exex: Vec<(
995            String,
996            Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
997        )>,
998    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
999        ExExLauncher::new(
1000            self.head(),
1001            self.node_adapter().clone(),
1002            installed_exex,
1003            self.configs().clone(),
1004        )
1005        .launch()
1006        .await
1007    }
1008
1009    /// Creates the ERA import source based on node configuration.
1010    ///
1011    /// Returns `Some(EraImportSource)` if ERA is enabled in the node config, otherwise `None`.
1012    pub fn era_import_source(&self) -> Option<EraImportSource> {
1013        let node_config = self.node_config();
1014        if !node_config.era.enabled {
1015            return None;
1016        }
1017
1018        EraImportSource::maybe_new(
1019            node_config.era.source.path.clone(),
1020            node_config.era.source.url.clone(),
1021            || node_config.chain.chain().kind().default_era_host(),
1022            || node_config.datadir().data_dir().join("era").into(),
1023        )
1024    }
1025
1026    /// Creates consensus layer health events stream based on node configuration.
1027    ///
1028    /// Returns a stream that monitors consensus layer health if:
1029    /// - No debug tip is configured
1030    /// - Not running in dev mode
1031    ///
1032    /// Otherwise returns an empty stream.
1033    pub fn consensus_layer_events(
1034        &self,
1035    ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1036    where
1037        T::Provider: reth_provider::CanonChainTracker,
1038    {
1039        if self.node_config().debug.tip.is_none() && !self.is_dev() {
1040            Either::Left(
1041                ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1042                    .map(Into::into),
1043            )
1044        } else {
1045            Either::Right(stream::empty())
1046        }
1047    }
1048
1049    /// Spawns the [`EthStatsService`] service if configured.
1050    pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1051        let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1052
1053        let network = self.components().network().clone();
1054        let pool = self.components().pool().clone();
1055        let provider = self.node_adapter().provider.clone();
1056
1057        info!(target: "reth::cli", "Starting EthStats service at {}", url);
1058
1059        let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1060        tokio::spawn(async move { ethstats.run().await });
1061
1062        Ok(())
1063    }
1064}
1065
1066impl<T, CB>
1067    LaunchContextWith<
1068        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
1069    >
1070where
1071    T: FullNodeTypes<
1072        Provider: StateProviderFactory + ChainSpecProvider,
1073        Types: NodeTypesForProvider,
1074    >,
1075    CB: NodeComponentsBuilder<T>,
1076{
1077}
1078
1079/// Joins two attachments together, preserving access to both values.
1080///
1081/// This type enables the launch process to accumulate state while maintaining
1082/// access to all previously attached components. The `left` field holds the
1083/// previous state, while `right` holds the newly attached component.
1084#[derive(Clone, Copy, Debug)]
1085pub struct Attached<L, R> {
1086    left: L,
1087    right: R,
1088}
1089
1090impl<L, R> Attached<L, R> {
1091    /// Creates a new `Attached` with the given values.
1092    pub const fn new(left: L, right: R) -> Self {
1093        Self { left, right }
1094    }
1095
1096    /// Maps the left value to a new value.
1097    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1098    where
1099        F: FnOnce(L) -> T,
1100    {
1101        Attached::new(f(self.left), self.right)
1102    }
1103
1104    /// Maps the right value to a new value.
1105    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1106    where
1107        F: FnOnce(R) -> T,
1108    {
1109        Attached::new(self.left, f(self.right))
1110    }
1111
1112    /// Get a reference to the left value.
1113    pub const fn left(&self) -> &L {
1114        &self.left
1115    }
1116
1117    /// Get a reference to the right value.
1118    pub const fn right(&self) -> &R {
1119        &self.right
1120    }
1121
1122    /// Get a mutable reference to the left value.
1123    pub const fn left_mut(&mut self) -> &mut L {
1124        &mut self.left
1125    }
1126
1127    /// Get a mutable reference to the right value.
1128    pub const fn right_mut(&mut self) -> &mut R {
1129        &mut self.right
1130    }
1131}
1132
1133/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
1134/// reth.toml config
1135#[derive(Debug)]
1136pub struct WithConfigs<ChainSpec> {
1137    /// The configured, usually derived from the CLI.
1138    pub config: NodeConfig<ChainSpec>,
1139    /// The loaded reth.toml config.
1140    pub toml_config: reth_config::Config,
1141}
1142
1143impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1144    fn clone(&self) -> Self {
1145        Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1146    }
1147}
1148
1149/// Helper container type to bundle the [`ProviderFactory`] and the metrics
1150/// sender.
1151#[derive(Debug, Clone)]
1152pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1153    provider_factory: ProviderFactory<N>,
1154    metrics_sender: UnboundedSender<MetricEvent>,
1155}
1156
1157/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1158/// and a metrics sender.
1159#[expect(missing_debug_implementations)]
1160pub struct WithMeteredProviders<T>
1161where
1162    T: FullNodeTypes,
1163{
1164    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1165    blockchain_db: T::Provider,
1166}
1167
1168/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1169#[expect(missing_debug_implementations)]
1170pub struct WithComponents<T, CB>
1171where
1172    T: FullNodeTypes,
1173    CB: NodeComponentsBuilder<T>,
1174{
1175    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1176    node_adapter: NodeAdapter<T, CB::Components>,
1177    head: Head,
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use super::{LaunchContext, NodeConfig};
1183    use reth_config::Config;
1184    use reth_node_core::args::PruningArgs;
1185
1186    const EXTENSION: &str = "toml";
1187
1188    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1189        let temp_dir = tempfile::tempdir().unwrap();
1190        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1191        proc(&config_path);
1192        temp_dir.close().unwrap()
1193    }
1194
1195    #[test]
1196    fn test_save_prune_config() {
1197        with_tempdir("prune-store-test", |config_path| {
1198            let mut reth_config = Config::default();
1199            let node_config = NodeConfig {
1200                pruning: PruningArgs {
1201                    full: true,
1202                    block_interval: None,
1203                    sender_recovery_full: false,
1204                    sender_recovery_distance: None,
1205                    sender_recovery_before: None,
1206                    transaction_lookup_full: false,
1207                    transaction_lookup_distance: None,
1208                    transaction_lookup_before: None,
1209                    receipts_full: false,
1210                    receipts_pre_merge: false,
1211                    receipts_distance: None,
1212                    receipts_before: None,
1213                    account_history_full: false,
1214                    account_history_distance: None,
1215                    account_history_before: None,
1216                    storage_history_full: false,
1217                    storage_history_distance: None,
1218                    storage_history_before: None,
1219                    bodies_pre_merge: false,
1220                    bodies_distance: None,
1221                    receipts_log_filter: None,
1222                    bodies_before: None,
1223                },
1224                ..NodeConfig::test()
1225            };
1226            LaunchContext::save_pruning_config_if_full_node(
1227                &mut reth_config,
1228                &node_config,
1229                config_path,
1230            )
1231            .unwrap();
1232
1233            let loaded_config = Config::from_path(config_path).unwrap();
1234
1235            assert_eq!(reth_config, loaded_config);
1236        })
1237    }
1238}