reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2//!
3//! ## Launch Context Type System
4//!
5//! The node launch process uses a type-state pattern to ensure correct initialization
6//! order at compile time. Methods are only available when their prerequisites are met.
7//!
8//! ### Core Types
9//!
10//! - [`LaunchContext`]: Base context with executor and data directory
11//! - [`LaunchContextWith<T>`]: Context with an attached value of type `T`
12//! - [`Attached<L, R>`]: Pairs values, preserving both previous (L) and new (R) state
13//!
14//! ### Helper Attachments
15//!
16//! - [`WithConfigs`]: Node config + TOML config
17//! - [`WithMeteredProvider`]: Provider factory with metrics
18//! - [`WithMeteredProviders`]: Provider factory + blockchain provider
19//! - [`WithComponents`]: Final form with all components
20//!
21//! ### Method Availability
22//!
23//! Methods are implemented on specific type combinations:
24//! - `impl<T> LaunchContextWith<T>`: Generic methods available for any attachment
25//! - `impl LaunchContextWith<WithConfigs>`: Config-specific methods
26//! - `impl LaunchContextWith<Attached<WithConfigs, DB>>`: Database operations
27//! - `impl LaunchContextWith<Attached<WithConfigs, ProviderFactory>>`: Provider operations
28//! - etc.
29//!
30//! This ensures correct initialization order without runtime checks.
31
32use crate::{
33    components::{NodeComponents, NodeComponentsBuilder},
34    hooks::OnComponentInitializedHook,
35    BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_consensus::BlockHeader as _;
38use alloy_eips::eip2124::Head;
39use alloy_primitives::{BlockNumber, B256};
40use eyre::Context;
41use rayon::ThreadPoolBuilder;
42use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
43use reth_config::{config::EtlConfig, PruneConfig};
44use reth_consensus::noop::NoopConsensus;
45use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
46use reth_db_common::init::{init_genesis, InitStorageError};
47use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
48use reth_engine_local::MiningMode;
49use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
50use reth_exex::ExExManagerHandle;
51use reth_fs_util as fs;
52use reth_network_p2p::headers::client::HeadersClient;
53use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
54use reth_node_core::{
55    args::DefaultEraHost,
56    dirs::{ChainPath, DataDirPath},
57    node_config::NodeConfig,
58    primitives::BlockHeader,
59    version::version_metadata,
60};
61use reth_node_metrics::{
62    chain::ChainSpecInfo,
63    hooks::Hooks,
64    recorder::install_prometheus_recorder,
65    server::{MetricServer, MetricServerConfig},
66    version::VersionInfo,
67};
68use reth_provider::{
69    providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
70    BlockHashReader, BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory,
71    ProviderResult, StageCheckpointReader, StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77    sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78    StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::tracing::{debug, error, info, warn};
83use reth_transaction_pool::TransactionPool;
84use std::{sync::Arc, thread::available_parallelism};
85use tokio::sync::{
86    mpsc::{unbounded_channel, UnboundedSender},
87    oneshot, watch,
88};
89
90use futures::{future::Either, stream, Stream, StreamExt};
91use reth_node_ethstats::EthStatsService;
92use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
93
94/// Reusable setup for launching a node.
95///
96/// This is the entry point for the node launch process. It implements a builder
97/// pattern using type-state programming to enforce correct initialization order.
98///
99/// ## Type Evolution
100///
101/// Starting from `LaunchContext`, each method transforms the type to reflect
102/// accumulated state:
103///
104/// ```text
105/// LaunchContext
106///   └─> LaunchContextWith<WithConfigs>
107///       └─> LaunchContextWith<Attached<WithConfigs, DB>>
108///           └─> LaunchContextWith<Attached<WithConfigs, ProviderFactory>>
109///               └─> LaunchContextWith<Attached<WithConfigs, WithMeteredProviders>>
110///                   └─> LaunchContextWith<Attached<WithConfigs, WithComponents>>
111/// ```
112#[derive(Debug, Clone)]
113pub struct LaunchContext {
114    /// The task executor for the node.
115    pub task_executor: TaskExecutor,
116    /// The data directory for the node.
117    pub data_dir: ChainPath<DataDirPath>,
118}
119
120impl LaunchContext {
121    /// Create a new instance of the default node launcher.
122    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
123        Self { task_executor, data_dir }
124    }
125
126    /// Create launch context with attachment.
127    pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
128        LaunchContextWith { inner: self, attachment }
129    }
130
131    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
132    /// `config`.
133    ///
134    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
135    pub fn with_loaded_toml_config<ChainSpec>(
136        self,
137        config: NodeConfig<ChainSpec>,
138    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
139    where
140        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
141    {
142        let toml_config = self.load_toml_config(&config)?;
143        Ok(self.with(WithConfigs { config, toml_config }))
144    }
145
146    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
147    /// `config`.
148    ///
149    /// This is async because the trusted peers may have to be resolved.
150    pub fn load_toml_config<ChainSpec>(
151        &self,
152        config: &NodeConfig<ChainSpec>,
153    ) -> eyre::Result<reth_config::Config>
154    where
155        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
156    {
157        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
158
159        let mut toml_config = reth_config::Config::from_path(&config_path)
160            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
161
162        Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
163
164        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
165
166        // Update the config with the command line arguments
167        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
168
169        Ok(toml_config)
170    }
171
172    /// Save prune config to the toml file if node is a full node.
173    fn save_pruning_config_if_full_node<ChainSpec>(
174        reth_config: &mut reth_config::Config,
175        config: &NodeConfig<ChainSpec>,
176        config_path: impl AsRef<std::path::Path>,
177    ) -> eyre::Result<()>
178    where
179        ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
180    {
181        if reth_config.prune.is_none() {
182            if let Some(prune_config) = config.prune_config() {
183                reth_config.update_prune_config(prune_config);
184                info!(target: "reth::cli", "Saving prune config to toml file");
185                reth_config.save(config_path.as_ref())?;
186            }
187        } else if config.prune_config().is_none() {
188            warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
189        }
190        Ok(())
191    }
192
193    /// Convenience function to [`Self::configure_globals`]
194    pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
195        self.configure_globals(reserved_cpu_cores);
196        self
197    }
198
199    /// Configure global settings this includes:
200    ///
201    /// - Raising the file descriptor limit
202    /// - Configuring the global rayon thread pool with available parallelism. Honoring
203    ///   engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
204    ///   core for the rayon thread pool
205    pub fn configure_globals(&self, reserved_cpu_cores: usize) {
206        // Raise the fd limit of the process.
207        // Does not do anything on windows.
208        match fdlimit::raise_fd_limit() {
209            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
210                debug!(from, to, "Raised file descriptor limit");
211            }
212            Ok(fdlimit::Outcome::Unsupported) => {}
213            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
214        }
215
216        // Reserving the given number of CPU cores for the rest of OS.
217        // Users can reserve more cores by setting engine.reserved-cpu-cores
218        // Note: The global rayon thread pool will use at least one core.
219        let num_threads = available_parallelism()
220            .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
221        if let Err(err) = ThreadPoolBuilder::new()
222            .num_threads(num_threads)
223            .thread_name(|i| format!("reth-rayon-{i}"))
224            .build_global()
225        {
226            warn!(%err, "Failed to build global thread pool")
227        }
228    }
229}
230
231/// A [`LaunchContext`] along with an additional value.
232///
233/// The type parameter `T` represents the current state of the launch process.
234/// Methods are conditionally implemented based on `T`, ensuring operations
235/// are only available when their prerequisites are met.
236///
237/// For example:
238/// - Config methods when `T = WithConfigs<ChainSpec>`
239/// - Database operations when `T = Attached<WithConfigs<ChainSpec>, DB>`
240/// - Provider operations when `T = Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>`
241#[derive(Debug, Clone)]
242pub struct LaunchContextWith<T> {
243    /// The wrapped launch context.
244    pub inner: LaunchContext,
245    /// The additional attached value.
246    pub attachment: T,
247}
248
249impl<T> LaunchContextWith<T> {
250    /// Configure global settings this includes:
251    ///
252    /// - Raising the file descriptor limit
253    /// - Configuring the global rayon thread pool
254    pub fn configure_globals(&self, reserved_cpu_cores: u64) {
255        self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
256    }
257
258    /// Returns the data directory.
259    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
260        &self.inner.data_dir
261    }
262
263    /// Returns the task executor.
264    pub const fn task_executor(&self) -> &TaskExecutor {
265        &self.inner.task_executor
266    }
267
268    /// Attaches another value to the launch context.
269    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
270        LaunchContextWith {
271            inner: self.inner,
272            attachment: Attached::new(self.attachment, attachment),
273        }
274    }
275
276    /// Consumes the type and calls a function with a reference to the context.
277    // Returns the context again
278    pub fn inspect<F>(self, f: F) -> Self
279    where
280        F: FnOnce(&Self),
281    {
282        f(&self);
283        self
284    }
285}
286
287impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
288    /// Resolves the trusted peers and adds them to the toml config.
289    pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
290        if !self.attachment.config.network.trusted_peers.is_empty() {
291            info!(target: "reth::cli", "Adding trusted nodes");
292
293            self.attachment
294                .toml_config
295                .peers
296                .trusted_nodes
297                .extend(self.attachment.config.network.trusted_peers.clone());
298        }
299        Ok(self)
300    }
301}
302
303impl<L, R> LaunchContextWith<Attached<L, R>> {
304    /// Get a reference to the left value.
305    pub const fn left(&self) -> &L {
306        &self.attachment.left
307    }
308
309    /// Get a reference to the right value.
310    pub const fn right(&self) -> &R {
311        &self.attachment.right
312    }
313
314    /// Get a mutable reference to the left value.
315    pub const fn left_mut(&mut self) -> &mut L {
316        &mut self.attachment.left
317    }
318
319    /// Get a mutable reference to the right value.
320    pub const fn right_mut(&mut self) -> &mut R {
321        &mut self.attachment.right
322    }
323}
324impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
325    /// Adjust certain settings in the config to make sure they are set correctly
326    ///
327    /// This includes:
328    /// - Making sure the ETL dir is set to the datadir
329    /// - RPC settings are adjusted to the correct port
330    pub fn with_adjusted_configs(self) -> Self {
331        self.ensure_etl_datadir().with_adjusted_instance_ports()
332    }
333
334    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
335    pub fn ensure_etl_datadir(mut self) -> Self {
336        if self.toml_config_mut().stages.etl.dir.is_none() {
337            let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
338            if etl_path.exists() {
339                // Remove etl-path files on launch
340                if let Err(err) = fs::remove_dir_all(&etl_path) {
341                    warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
342                }
343            }
344            self.toml_config_mut().stages.etl.dir = Some(etl_path);
345        }
346
347        self
348    }
349
350    /// Change rpc port numbers based on the instance number.
351    pub fn with_adjusted_instance_ports(mut self) -> Self {
352        self.node_config_mut().adjust_instance_ports();
353        self
354    }
355
356    /// Returns the container for all config types
357    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
358        self.attachment.left()
359    }
360
361    /// Returns the attached [`NodeConfig`].
362    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
363        &self.left().config
364    }
365
366    /// Returns the attached [`NodeConfig`].
367    pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
368        &mut self.left_mut().config
369    }
370
371    /// Returns the attached toml config [`reth_config::Config`].
372    pub const fn toml_config(&self) -> &reth_config::Config {
373        &self.left().toml_config
374    }
375
376    /// Returns the attached toml config [`reth_config::Config`].
377    pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
378        &mut self.left_mut().toml_config
379    }
380
381    /// Returns the configured chain spec.
382    pub fn chain_spec(&self) -> Arc<ChainSpec> {
383        self.node_config().chain.clone()
384    }
385
386    /// Get the hash of the genesis block.
387    pub fn genesis_hash(&self) -> B256 {
388        self.node_config().chain.genesis_hash()
389    }
390
391    /// Returns the chain identifier of the node.
392    pub fn chain_id(&self) -> Chain {
393        self.node_config().chain.chain()
394    }
395
396    /// Returns true if the node is configured as --dev
397    pub const fn is_dev(&self) -> bool {
398        self.node_config().dev.dev
399    }
400
401    /// Returns the configured [`PruneConfig`]
402    ///
403    /// Any configuration set in CLI will take precedence over those set in toml
404    pub fn prune_config(&self) -> Option<PruneConfig>
405    where
406        ChainSpec: reth_chainspec::EthereumHardforks,
407    {
408        let Some(mut node_prune_config) = self.node_config().prune_config() else {
409            // No CLI config is set, use the toml config.
410            return self.toml_config().prune.clone();
411        };
412
413        // Otherwise, use the CLI configuration and merge with toml config.
414        node_prune_config.merge(self.toml_config().prune.clone());
415        Some(node_prune_config)
416    }
417
418    /// Returns the configured [`PruneModes`], returning the default if no config was available.
419    pub fn prune_modes(&self) -> PruneModes
420    where
421        ChainSpec: reth_chainspec::EthereumHardforks,
422    {
423        self.prune_config().map(|config| config.segments).unwrap_or_default()
424    }
425
426    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
427    pub fn pruner_builder(&self) -> PrunerBuilder
428    where
429        ChainSpec: reth_chainspec::EthereumHardforks,
430    {
431        PrunerBuilder::new(self.prune_config().unwrap_or_default())
432            .delete_limit(self.chain_spec().prune_delete_limit())
433            .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
434    }
435
436    /// Loads the JWT secret for the engine API
437    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
438        let default_jwt_path = self.data_dir().jwt();
439        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
440        Ok(secret)
441    }
442
443    /// Returns the [`MiningMode`] intended for --dev mode.
444    pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
445    where
446        Pool: TransactionPool + Unpin,
447    {
448        if let Some(interval) = self.node_config().dev.block_time {
449            MiningMode::interval(interval)
450        } else {
451            MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
452        }
453    }
454}
455
456impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
457where
458    DB: Database + Clone + 'static,
459    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
460{
461    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
462    /// between the database and static files. **It may execute a pipeline unwind if it fails this
463    /// check.**
464    pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
465    where
466        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
467        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
468    {
469        let factory = ProviderFactory::new(
470            self.right().clone(),
471            self.chain_spec(),
472            StaticFileProvider::read_write(self.data_dir().static_files())?,
473        )
474        .with_prune_modes(self.prune_modes())
475        .with_static_files_metrics();
476
477        let has_receipt_pruning =
478            self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
479
480        // Check for consistency between database and static files. If it fails, it unwinds to
481        // the first block that's consistent between database and static files.
482        if let Some(unwind_target) = factory
483            .static_file_provider()
484            .check_consistency(&factory.provider()?, has_receipt_pruning)?
485        {
486            // Highly unlikely to happen, and given its destructive nature, it's better to panic
487            // instead.
488            assert_ne!(
489                unwind_target,
490                PipelineTarget::Unwind(0),
491                "A static file <> database inconsistency was found that would trigger an unwind to block 0"
492            );
493
494            info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
495
496            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
497
498            // Builds an unwind-only pipeline
499            let pipeline = PipelineBuilder::default()
500                .add_stages(DefaultStages::new(
501                    factory.clone(),
502                    tip_rx,
503                    Arc::new(NoopConsensus::default()),
504                    NoopHeaderDownloader::default(),
505                    NoopBodiesDownloader::default(),
506                    NoopEvmConfig::<Evm>::default(),
507                    self.toml_config().stages.clone(),
508                    self.prune_modes(),
509                    None,
510                ))
511                .build(
512                    factory.clone(),
513                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
514                );
515
516            // Unwinds to block
517            let (tx, rx) = oneshot::channel();
518
519            // Pipeline should be run as blocking and panic if it fails.
520            self.task_executor().spawn_critical_blocking(
521                "pipeline task",
522                Box::pin(async move {
523                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
524                    let _ = tx.send(result);
525                }),
526            );
527            rx.await?.inspect_err(|err| {
528                error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
529            })?;
530        }
531
532        Ok(factory)
533    }
534
535    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
536    pub async fn with_provider_factory<N, Evm>(
537        self,
538    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
539    where
540        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
541        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
542    {
543        let factory = self.create_provider_factory::<N, Evm>().await?;
544        let ctx = LaunchContextWith {
545            inner: self.inner,
546            attachment: self.attachment.map_right(|_| factory),
547        };
548
549        Ok(ctx)
550    }
551}
552
553impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
554where
555    T: ProviderNodeTypes,
556{
557    /// Returns access to the underlying database.
558    pub const fn database(&self) -> &T::DB {
559        self.right().db_ref()
560    }
561
562    /// Returns the configured `ProviderFactory`.
563    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
564        self.right()
565    }
566
567    /// Returns the static file provider to interact with the static files.
568    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
569        self.right().static_file_provider()
570    }
571
572    /// This launches the prometheus endpoint.
573    ///
574    /// Convenience function to [`Self::start_prometheus_endpoint`]
575    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
576        self.start_prometheus_endpoint().await?;
577        Ok(self)
578    }
579
580    /// Starts the prometheus endpoint.
581    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
582        // ensure recorder runs upkeep periodically
583        install_prometheus_recorder().spawn_upkeep();
584
585        let listen_addr = self.node_config().metrics;
586        if let Some(addr) = listen_addr {
587            info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
588            let config = MetricServerConfig::new(
589                addr,
590                VersionInfo {
591                    version: version_metadata().cargo_pkg_version.as_ref(),
592                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
593                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
594                    git_sha: version_metadata().vergen_git_sha.as_ref(),
595                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
596                    build_profile: version_metadata().build_profile_name.as_ref(),
597                },
598                ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
599                self.task_executor().clone(),
600                Hooks::builder()
601                    .with_hook({
602                        let db = self.database().clone();
603                        move || db.report_metrics()
604                    })
605                    .with_hook({
606                        let sfp = self.static_file_provider();
607                        move || {
608                            if let Err(error) = sfp.report_metrics() {
609                                error!(%error, "Failed to report metrics for the static file provider");
610                            }
611                        }
612                    })
613                    .build(),
614            );
615
616            MetricServer::new(config).serve().await?;
617        }
618
619        Ok(())
620    }
621
622    /// Convenience function to [`Self::init_genesis`]
623    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
624        init_genesis(self.provider_factory())?;
625        Ok(self)
626    }
627
628    /// Write the genesis block and state if it has not already been written
629    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
630        init_genesis(self.provider_factory())
631    }
632
633    /// Creates a new `WithMeteredProvider` container and attaches it to the
634    /// launch context.
635    ///
636    /// This spawns a metrics task that listens for metrics related events and updates metrics for
637    /// prometheus.
638    pub fn with_metrics_task(
639        self,
640    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
641        let (metrics_sender, metrics_receiver) = unbounded_channel();
642
643        let with_metrics =
644            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
645
646        debug!(target: "reth::cli", "Spawning stages metrics listener task");
647        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
648        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
649
650        LaunchContextWith {
651            inner: self.inner,
652            attachment: self.attachment.map_right(|_| with_metrics),
653        }
654    }
655}
656
657impl<N, DB>
658    LaunchContextWith<
659        Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
660    >
661where
662    N: NodeTypes,
663    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
664{
665    /// Returns the configured `ProviderFactory`.
666    const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
667        &self.right().provider_factory
668    }
669
670    /// Returns the metrics sender.
671    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
672        self.right().metrics_sender.clone()
673    }
674
675    /// Creates a `BlockchainProvider` and attaches it to the launch context.
676    #[expect(clippy::complexity)]
677    pub fn with_blockchain_db<T, F>(
678        self,
679        create_blockchain_provider: F,
680    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
681    where
682        T: FullNodeTypes<Types = N, DB = DB>,
683        F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
684    {
685        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
686
687        let metered_providers = WithMeteredProviders {
688            db_provider_container: WithMeteredProvider {
689                provider_factory: self.provider_factory().clone(),
690                metrics_sender: self.sync_metrics_tx(),
691            },
692            blockchain_db,
693        };
694
695        let ctx = LaunchContextWith {
696            inner: self.inner,
697            attachment: self.attachment.map_right(|_| metered_providers),
698        };
699
700        Ok(ctx)
701    }
702}
703
704impl<T>
705    LaunchContextWith<
706        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
707    >
708where
709    T: FullNodeTypes<Types: NodeTypesForProvider>,
710{
711    /// Returns access to the underlying database.
712    pub const fn database(&self) -> &T::DB {
713        self.provider_factory().db_ref()
714    }
715
716    /// Returns the configured `ProviderFactory`.
717    pub const fn provider_factory(
718        &self,
719    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
720        &self.right().db_provider_container.provider_factory
721    }
722
723    /// Fetches the head block from the database.
724    ///
725    /// If the database is empty, returns the genesis block.
726    pub fn lookup_head(&self) -> eyre::Result<Head> {
727        self.node_config()
728            .lookup_head(self.provider_factory())
729            .wrap_err("the head block is missing")
730    }
731
732    /// Returns the metrics sender.
733    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
734        self.right().db_provider_container.metrics_sender.clone()
735    }
736
737    /// Returns a reference to the blockchain provider.
738    pub const fn blockchain_db(&self) -> &T::Provider {
739        &self.right().blockchain_db
740    }
741
742    /// Creates a `NodeAdapter` and attaches it to the launch context.
743    pub async fn with_components<CB>(
744        self,
745        components_builder: CB,
746        on_component_initialized: Box<
747            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
748        >,
749    ) -> eyre::Result<
750        LaunchContextWith<
751            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
752        >,
753    >
754    where
755        CB: NodeComponentsBuilder<T>,
756    {
757        // fetch the head block from the database
758        let head = self.lookup_head()?;
759
760        let builder_ctx = BuilderContext::new(
761            head,
762            self.blockchain_db().clone(),
763            self.task_executor().clone(),
764            self.configs().clone(),
765        );
766
767        debug!(target: "reth::cli", "creating components");
768        let components = components_builder.build_components(&builder_ctx).await?;
769
770        let blockchain_db = self.blockchain_db().clone();
771
772        let node_adapter = NodeAdapter {
773            components,
774            task_executor: self.task_executor().clone(),
775            provider: blockchain_db,
776        };
777
778        debug!(target: "reth::cli", "calling on_component_initialized hook");
779        on_component_initialized.on_event(node_adapter.clone())?;
780
781        let components_container = WithComponents {
782            db_provider_container: WithMeteredProvider {
783                provider_factory: self.provider_factory().clone(),
784                metrics_sender: self.sync_metrics_tx(),
785            },
786            node_adapter,
787            head,
788        };
789
790        let ctx = LaunchContextWith {
791            inner: self.inner,
792            attachment: self.attachment.map_right(|_| components_container),
793        };
794
795        Ok(ctx)
796    }
797}
798
799impl<T, CB>
800    LaunchContextWith<
801        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
802    >
803where
804    T: FullNodeTypes<Types: NodeTypesForProvider>,
805    CB: NodeComponentsBuilder<T>,
806{
807    /// Returns the configured `ProviderFactory`.
808    pub const fn provider_factory(
809        &self,
810    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
811        &self.right().db_provider_container.provider_factory
812    }
813
814    /// Returns the max block that the node should run to, looking it up from the network if
815    /// necessary
816    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
817    where
818        C: HeadersClient<Header: BlockHeader>,
819    {
820        self.node_config().max_block(client, self.provider_factory().clone()).await
821    }
822
823    /// Returns the static file provider to interact with the static files.
824    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
825        self.provider_factory().static_file_provider()
826    }
827
828    /// Creates a new [`StaticFileProducer`] with the attached database.
829    pub fn static_file_producer(
830        &self,
831    ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
832        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
833    }
834
835    /// Returns the current head block.
836    pub const fn head(&self) -> Head {
837        self.right().head
838    }
839
840    /// Returns the configured `NodeAdapter`.
841    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
842        &self.right().node_adapter
843    }
844
845    /// Returns mutable reference to the configured `NodeAdapter`.
846    pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
847        &mut self.right_mut().node_adapter
848    }
849
850    /// Returns a reference to the blockchain provider.
851    pub const fn blockchain_db(&self) -> &T::Provider {
852        &self.node_adapter().provider
853    }
854
855    /// Returns the initial backfill to sync to at launch.
856    ///
857    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
858    /// previously interrupted and returns the block hash of the last checkpoint, see also
859    /// [`Self::check_pipeline_consistency`]
860    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
861        let mut initial_target = self.node_config().debug.tip;
862
863        if initial_target.is_none() {
864            initial_target = self.check_pipeline_consistency()?;
865        }
866
867        Ok(initial_target)
868    }
869
870    /// Returns true if the node should terminate after the initial backfill run.
871    ///
872    /// This is the case if any of these configs are set:
873    ///  `--debug.max-block`
874    ///  `--debug.terminate`
875    pub const fn terminate_after_initial_backfill(&self) -> bool {
876        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
877    }
878
879    /// Ensures that the database matches chain-specific requirements.
880    ///
881    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
882    /// bedrock height)
883    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
884        if self.chain_spec().is_optimism() &&
885            !self.is_dev() &&
886            self.chain_id() == Chain::optimism_mainnet()
887        {
888            let latest = self.blockchain_db().last_block_number()?;
889            // bedrock height
890            if latest < 105235063 {
891                error!(
892                    "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
893                );
894                return Err(ProviderError::BestBlockNotFound)
895            }
896        }
897
898        Ok(())
899    }
900
901    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
902    /// than the checkpoint of the first stage).
903    ///
904    /// This will return the pipeline target if:
905    ///  * the pipeline was interrupted during its previous run
906    ///  * a new stage was added
907    ///  * stage data was dropped manually through `reth stage drop ...`
908    ///
909    /// # Returns
910    ///
911    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
912    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
913        // If no target was provided, check if the stages are congruent - check if the
914        // checkpoint of the last stage matches the checkpoint of the first.
915        let first_stage_checkpoint = self
916            .blockchain_db()
917            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
918            .unwrap_or_default()
919            .block_number;
920
921        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
922        // against it.
923        for stage_id in StageId::ALL.iter().skip(1) {
924            let stage_checkpoint = self
925                .blockchain_db()
926                .get_stage_checkpoint(*stage_id)?
927                .unwrap_or_default()
928                .block_number;
929
930            // If the checkpoint of any stage is less than the checkpoint of the first stage,
931            // retrieve and return the block hash of the latest header and use it as the target.
932            if stage_checkpoint < first_stage_checkpoint {
933                debug!(
934                    target: "consensus::engine",
935                    first_stage_checkpoint,
936                    inconsistent_stage_id = %stage_id,
937                    inconsistent_stage_checkpoint = stage_checkpoint,
938                    "Pipeline sync progress is inconsistent"
939                );
940                return self.blockchain_db().block_hash(first_stage_checkpoint);
941            }
942        }
943
944        self.ensure_chain_specific_db_checks()?;
945
946        Ok(None)
947    }
948
949    /// Expire the pre-merge transactions if the node is configured to do so and the chain has a
950    /// merge block.
951    ///
952    /// If the node is configured to prune pre-merge transactions and it has synced past the merge
953    /// block, it will delete the pre-merge transaction static files if they still exist.
954    pub fn expire_pre_merge_transactions(&self) -> eyre::Result<()>
955    where
956        T: FullNodeTypes<Provider: StaticFileProviderFactory>,
957    {
958        if self.node_config().pruning.bodies_pre_merge &&
959            let Some(merge_block) = self
960                .chain_spec()
961                .ethereum_fork_activation(EthereumHardfork::Paris)
962                .block_number()
963        {
964            // Ensure we only expire transactions after we synced past the merge block.
965            let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
966            if latest.number() > merge_block {
967                let provider = self.blockchain_db().static_file_provider();
968                if provider
969                    .get_lowest_transaction_static_file_block()
970                    .is_some_and(|lowest| lowest < merge_block)
971                {
972                    info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
973                    provider.delete_transactions_below(merge_block)?;
974                } else {
975                    debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
976                }
977            }
978        }
979
980        Ok(())
981    }
982
983    /// Returns the metrics sender.
984    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
985        self.right().db_provider_container.metrics_sender.clone()
986    }
987
988    /// Returns the node adapter components.
989    pub const fn components(&self) -> &CB::Components {
990        &self.node_adapter().components
991    }
992
993    /// Launches ExEx (Execution Extensions) and returns the ExEx manager handle.
994    #[allow(clippy::type_complexity)]
995    pub async fn launch_exex(
996        &self,
997        installed_exex: Vec<(
998            String,
999            Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1000        )>,
1001    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1002        ExExLauncher::new(
1003            self.head(),
1004            self.node_adapter().clone(),
1005            installed_exex,
1006            self.configs().clone(),
1007        )
1008        .launch()
1009        .await
1010    }
1011
1012    /// Creates the ERA import source based on node configuration.
1013    ///
1014    /// Returns `Some(EraImportSource)` if ERA is enabled in the node config, otherwise `None`.
1015    pub fn era_import_source(&self) -> Option<EraImportSource> {
1016        let node_config = self.node_config();
1017        if !node_config.era.enabled {
1018            return None;
1019        }
1020
1021        EraImportSource::maybe_new(
1022            node_config.era.source.path.clone(),
1023            node_config.era.source.url.clone(),
1024            || node_config.chain.chain().kind().default_era_host(),
1025            || node_config.datadir().data_dir().join("era").into(),
1026        )
1027    }
1028
1029    /// Creates consensus layer health events stream based on node configuration.
1030    ///
1031    /// Returns a stream that monitors consensus layer health if:
1032    /// - No debug tip is configured
1033    /// - Not running in dev mode
1034    ///
1035    /// Otherwise returns an empty stream.
1036    pub fn consensus_layer_events(
1037        &self,
1038    ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1039    where
1040        T::Provider: reth_provider::CanonChainTracker,
1041    {
1042        if self.node_config().debug.tip.is_none() && !self.is_dev() {
1043            Either::Left(
1044                ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1045                    .map(Into::into),
1046            )
1047        } else {
1048            Either::Right(stream::empty())
1049        }
1050    }
1051
1052    /// Spawns the [`EthStatsService`] service if configured.
1053    pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1054        let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1055
1056        let network = self.components().network().clone();
1057        let pool = self.components().pool().clone();
1058        let provider = self.node_adapter().provider.clone();
1059
1060        info!(target: "reth::cli", "Starting EthStats service at {}", url);
1061
1062        let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1063        tokio::spawn(async move { ethstats.run().await });
1064
1065        Ok(())
1066    }
1067}
1068
1069/// Joins two attachments together, preserving access to both values.
1070///
1071/// This type enables the launch process to accumulate state while maintaining
1072/// access to all previously attached components. The `left` field holds the
1073/// previous state, while `right` holds the newly attached component.
1074#[derive(Clone, Copy, Debug)]
1075pub struct Attached<L, R> {
1076    left: L,
1077    right: R,
1078}
1079
1080impl<L, R> Attached<L, R> {
1081    /// Creates a new `Attached` with the given values.
1082    pub const fn new(left: L, right: R) -> Self {
1083        Self { left, right }
1084    }
1085
1086    /// Maps the left value to a new value.
1087    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1088    where
1089        F: FnOnce(L) -> T,
1090    {
1091        Attached::new(f(self.left), self.right)
1092    }
1093
1094    /// Maps the right value to a new value.
1095    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1096    where
1097        F: FnOnce(R) -> T,
1098    {
1099        Attached::new(self.left, f(self.right))
1100    }
1101
1102    /// Get a reference to the left value.
1103    pub const fn left(&self) -> &L {
1104        &self.left
1105    }
1106
1107    /// Get a reference to the right value.
1108    pub const fn right(&self) -> &R {
1109        &self.right
1110    }
1111
1112    /// Get a mutable reference to the left value.
1113    pub const fn left_mut(&mut self) -> &mut L {
1114        &mut self.left
1115    }
1116
1117    /// Get a mutable reference to the right value.
1118    pub const fn right_mut(&mut self) -> &mut R {
1119        &mut self.right
1120    }
1121}
1122
1123/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
1124/// reth.toml config
1125#[derive(Debug)]
1126pub struct WithConfigs<ChainSpec> {
1127    /// The configured, usually derived from the CLI.
1128    pub config: NodeConfig<ChainSpec>,
1129    /// The loaded reth.toml config.
1130    pub toml_config: reth_config::Config,
1131}
1132
1133impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1134    fn clone(&self) -> Self {
1135        Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1136    }
1137}
1138
1139/// Helper container type to bundle the [`ProviderFactory`] and the metrics
1140/// sender.
1141#[derive(Debug, Clone)]
1142pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1143    provider_factory: ProviderFactory<N>,
1144    metrics_sender: UnboundedSender<MetricEvent>,
1145}
1146
1147/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1148/// and a metrics sender.
1149#[expect(missing_debug_implementations)]
1150pub struct WithMeteredProviders<T>
1151where
1152    T: FullNodeTypes,
1153{
1154    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1155    blockchain_db: T::Provider,
1156}
1157
1158/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1159#[expect(missing_debug_implementations)]
1160pub struct WithComponents<T, CB>
1161where
1162    T: FullNodeTypes,
1163    CB: NodeComponentsBuilder<T>,
1164{
1165    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1166    node_adapter: NodeAdapter<T, CB::Components>,
1167    head: Head,
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::{LaunchContext, NodeConfig};
1173    use reth_config::Config;
1174    use reth_node_core::args::PruningArgs;
1175
1176    const EXTENSION: &str = "toml";
1177
1178    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1179        let temp_dir = tempfile::tempdir().unwrap();
1180        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1181        proc(&config_path);
1182        temp_dir.close().unwrap()
1183    }
1184
1185    #[test]
1186    fn test_save_prune_config() {
1187        with_tempdir("prune-store-test", |config_path| {
1188            let mut reth_config = Config::default();
1189            let node_config = NodeConfig {
1190                pruning: PruningArgs {
1191                    full: true,
1192                    block_interval: None,
1193                    sender_recovery_full: false,
1194                    sender_recovery_distance: None,
1195                    sender_recovery_before: None,
1196                    transaction_lookup_full: false,
1197                    transaction_lookup_distance: None,
1198                    transaction_lookup_before: None,
1199                    receipts_full: false,
1200                    receipts_pre_merge: false,
1201                    receipts_distance: None,
1202                    receipts_before: None,
1203                    account_history_full: false,
1204                    account_history_distance: None,
1205                    account_history_before: None,
1206                    storage_history_full: false,
1207                    storage_history_distance: None,
1208                    storage_history_before: None,
1209                    bodies_pre_merge: false,
1210                    bodies_distance: None,
1211                    receipts_log_filter: None,
1212                    bodies_before: None,
1213                },
1214                ..NodeConfig::test()
1215            };
1216            LaunchContext::save_pruning_config_if_full_node(
1217                &mut reth_config,
1218                &node_config,
1219                config_path,
1220            )
1221            .unwrap();
1222
1223            let loaded_config = Config::from_path(config_path).unwrap();
1224
1225            assert_eq!(reth_config, loaded_config);
1226        })
1227    }
1228}