reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2
3use crate::{
4    components::{NodeComponents, NodeComponentsBuilder},
5    hooks::OnComponentInitializedHook,
6    BuilderContext, NodeAdapter,
7};
8use alloy_eips::eip2124::Head;
9use alloy_primitives::{BlockNumber, B256};
10use eyre::{Context, OptionExt};
11use rayon::ThreadPoolBuilder;
12use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
13use reth_config::{config::EtlConfig, PruneConfig};
14use reth_consensus::noop::NoopConsensus;
15use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
16use reth_db_common::init::{init_genesis, InitStorageError};
17use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
18use reth_engine_local::MiningMode;
19use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
20use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
21use reth_fs_util as fs;
22use reth_invalid_block_hooks::InvalidBlockWitnessHook;
23use reth_network_p2p::headers::client::HeadersClient;
24use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
25use reth_node_core::{
26    args::InvalidBlockHookType,
27    dirs::{ChainPath, DataDirPath},
28    node_config::NodeConfig,
29    primitives::BlockHeader,
30    version::{
31        BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
32        VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
33    },
34};
35use reth_node_metrics::{
36    chain::ChainSpecInfo,
37    hooks::Hooks,
38    recorder::install_prometheus_recorder,
39    server::{MetricServer, MetricServerConfig},
40    version::VersionInfo,
41};
42use reth_provider::{
43    providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
44    BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
45    ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
46};
47use reth_prune::{PruneModes, PrunerBuilder};
48use reth_rpc_api::clients::EthApiClient;
49use reth_rpc_builder::config::RethRpcServerConfig;
50use reth_rpc_layer::JwtSecret;
51use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
52use reth_static_file::StaticFileProducer;
53use reth_tasks::TaskExecutor;
54use reth_tracing::tracing::{debug, error, info, warn};
55use reth_transaction_pool::TransactionPool;
56use std::{sync::Arc, thread::available_parallelism};
57use tokio::sync::{
58    mpsc::{unbounded_channel, UnboundedSender},
59    oneshot, watch,
60};
61
62/// Reusable setup for launching a node.
63///
64/// This provides commonly used boilerplate for launching a node.
65#[derive(Debug, Clone)]
66pub struct LaunchContext {
67    /// The task executor for the node.
68    pub task_executor: TaskExecutor,
69    /// The data directory for the node.
70    pub data_dir: ChainPath<DataDirPath>,
71}
72
73impl LaunchContext {
74    /// Create a new instance of the default node launcher.
75    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
76        Self { task_executor, data_dir }
77    }
78
79    /// Create launch context with attachment.
80    pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
81        LaunchContextWith { inner: self, attachment }
82    }
83
84    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
85    /// `config`.
86    ///
87    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
88    pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
89        self,
90        config: NodeConfig<ChainSpec>,
91    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
92        let toml_config = self.load_toml_config(&config)?;
93        Ok(self.with(WithConfigs { config, toml_config }))
94    }
95
96    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
97    /// `config`.
98    ///
99    /// This is async because the trusted peers may have to be resolved.
100    pub fn load_toml_config<ChainSpec: EthChainSpec>(
101        &self,
102        config: &NodeConfig<ChainSpec>,
103    ) -> eyre::Result<reth_config::Config> {
104        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
105
106        let mut toml_config = reth_config::Config::from_path(&config_path)
107            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
108
109        Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
110
111        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
112
113        // Update the config with the command line arguments
114        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
115
116        Ok(toml_config)
117    }
118
119    /// Save prune config to the toml file if node is a full node.
120    fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
121        reth_config: &mut reth_config::Config,
122        config: &NodeConfig<ChainSpec>,
123        config_path: impl AsRef<std::path::Path>,
124    ) -> eyre::Result<()> {
125        if reth_config.prune.is_none() {
126            if let Some(prune_config) = config.prune_config() {
127                reth_config.update_prune_config(prune_config);
128                info!(target: "reth::cli", "Saving prune config to toml file");
129                reth_config.save(config_path.as_ref())?;
130            }
131        } else if config.prune_config().is_none() {
132            warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
133        }
134        Ok(())
135    }
136
137    /// Convenience function to [`Self::configure_globals`]
138    pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
139        self.configure_globals(reserved_cpu_cores);
140        self
141    }
142
143    /// Configure global settings this includes:
144    ///
145    /// - Raising the file descriptor limit
146    /// - Configuring the global rayon thread pool with available parallelism. Honoring
147    ///   engine.reserved-cpu-cores to reserve given number of cores for O while using at least 1
148    ///   core for the rayon thread pool
149    pub fn configure_globals(&self, reserved_cpu_cores: usize) {
150        // Raise the fd limit of the process.
151        // Does not do anything on windows.
152        match fdlimit::raise_fd_limit() {
153            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
154                debug!(from, to, "Raised file descriptor limit");
155            }
156            Ok(fdlimit::Outcome::Unsupported) => {}
157            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
158        }
159
160        // Reserving the given number of CPU cores for the rest of OS.
161        // Users can reserve more cores by setting engine.reserved-cpu-cores
162        // Note: The global rayon thread pool will use at least one core.
163        let num_threads = available_parallelism()
164            .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
165        if let Err(err) = ThreadPoolBuilder::new()
166            .num_threads(num_threads)
167            .thread_name(|i| format!("reth-rayon-{i}"))
168            .build_global()
169        {
170            error!(%err, "Failed to build global thread pool")
171        }
172    }
173}
174
175/// A [`LaunchContext`] along with an additional value.
176///
177/// This can be used to sequentially attach additional values to the type during the launch process.
178///
179/// The type provides common boilerplate for launching a node depending on the additional value.
180#[derive(Debug, Clone)]
181pub struct LaunchContextWith<T> {
182    /// The wrapped launch context.
183    pub inner: LaunchContext,
184    /// The additional attached value.
185    pub attachment: T,
186}
187
188impl<T> LaunchContextWith<T> {
189    /// Configure global settings this includes:
190    ///
191    /// - Raising the file descriptor limit
192    /// - Configuring the global rayon thread pool
193    pub fn configure_globals(&self, reserved_cpu_cores: u64) {
194        self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
195    }
196
197    /// Returns the data directory.
198    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
199        &self.inner.data_dir
200    }
201
202    /// Returns the task executor.
203    pub const fn task_executor(&self) -> &TaskExecutor {
204        &self.inner.task_executor
205    }
206
207    /// Attaches another value to the launch context.
208    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
209        LaunchContextWith {
210            inner: self.inner,
211            attachment: Attached::new(self.attachment, attachment),
212        }
213    }
214
215    /// Consumes the type and calls a function with a reference to the context.
216    // Returns the context again
217    pub fn inspect<F>(self, f: F) -> Self
218    where
219        F: FnOnce(&Self),
220    {
221        f(&self);
222        self
223    }
224}
225
226impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
227    /// Resolves the trusted peers and adds them to the toml config.
228    pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
229        if !self.attachment.config.network.trusted_peers.is_empty() {
230            info!(target: "reth::cli", "Adding trusted nodes");
231
232            self.attachment
233                .toml_config
234                .peers
235                .trusted_nodes
236                .extend(self.attachment.config.network.trusted_peers.clone());
237        }
238        Ok(self)
239    }
240}
241
242impl<L, R> LaunchContextWith<Attached<L, R>> {
243    /// Get a reference to the left value.
244    pub const fn left(&self) -> &L {
245        &self.attachment.left
246    }
247
248    /// Get a reference to the right value.
249    pub const fn right(&self) -> &R {
250        &self.attachment.right
251    }
252
253    /// Get a mutable reference to the right value.
254    pub const fn left_mut(&mut self) -> &mut L {
255        &mut self.attachment.left
256    }
257
258    /// Get a mutable reference to the right value.
259    pub const fn right_mut(&mut self) -> &mut R {
260        &mut self.attachment.right
261    }
262}
263impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
264    /// Adjust certain settings in the config to make sure they are set correctly
265    ///
266    /// This includes:
267    /// - Making sure the ETL dir is set to the datadir
268    /// - RPC settings are adjusted to the correct port
269    pub fn with_adjusted_configs(self) -> Self {
270        self.ensure_etl_datadir().with_adjusted_instance_ports()
271    }
272
273    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
274    pub fn ensure_etl_datadir(mut self) -> Self {
275        if self.toml_config_mut().stages.etl.dir.is_none() {
276            let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
277            if etl_path.exists() {
278                // Remove etl-path files on launch
279                if let Err(err) = fs::remove_dir_all(&etl_path) {
280                    warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
281                }
282            }
283            self.toml_config_mut().stages.etl.dir = Some(etl_path);
284        }
285
286        self
287    }
288
289    /// Change rpc port numbers based on the instance number.
290    pub fn with_adjusted_instance_ports(mut self) -> Self {
291        self.node_config_mut().adjust_instance_ports();
292        self
293    }
294
295    /// Returns the container for all config types
296    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
297        self.attachment.left()
298    }
299
300    /// Returns the attached [`NodeConfig`].
301    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
302        &self.left().config
303    }
304
305    /// Returns the attached [`NodeConfig`].
306    pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
307        &mut self.left_mut().config
308    }
309
310    /// Returns the attached toml config [`reth_config::Config`].
311    pub const fn toml_config(&self) -> &reth_config::Config {
312        &self.left().toml_config
313    }
314
315    /// Returns the attached toml config [`reth_config::Config`].
316    pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
317        &mut self.left_mut().toml_config
318    }
319
320    /// Returns the configured chain spec.
321    pub fn chain_spec(&self) -> Arc<ChainSpec> {
322        self.node_config().chain.clone()
323    }
324
325    /// Get the hash of the genesis block.
326    pub fn genesis_hash(&self) -> B256 {
327        self.node_config().chain.genesis_hash()
328    }
329
330    /// Returns the chain identifier of the node.
331    pub fn chain_id(&self) -> Chain {
332        self.node_config().chain.chain()
333    }
334
335    /// Returns true if the node is configured as --dev
336    pub const fn is_dev(&self) -> bool {
337        self.node_config().dev.dev
338    }
339
340    /// Returns the configured [`PruneConfig`]
341    ///
342    /// Any configuration set in CLI will take precedence over those set in toml
343    pub fn prune_config(&self) -> Option<PruneConfig> {
344        let Some(mut node_prune_config) = self.node_config().prune_config() else {
345            // No CLI config is set, use the toml config.
346            return self.toml_config().prune.clone();
347        };
348
349        // Otherwise, use the CLI configuration and merge with toml config.
350        node_prune_config.merge(self.toml_config().prune.clone());
351        Some(node_prune_config)
352    }
353
354    /// Returns the configured [`PruneModes`], returning the default if no config was available.
355    pub fn prune_modes(&self) -> PruneModes {
356        self.prune_config().map(|config| config.segments).unwrap_or_default()
357    }
358
359    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
360    pub fn pruner_builder(&self) -> PrunerBuilder {
361        PrunerBuilder::new(self.prune_config().unwrap_or_default())
362            .delete_limit(self.chain_spec().prune_delete_limit())
363            .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
364    }
365
366    /// Loads the JWT secret for the engine API
367    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
368        let default_jwt_path = self.data_dir().jwt();
369        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
370        Ok(secret)
371    }
372
373    /// Returns the [`MiningMode`] intended for --dev mode.
374    pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
375        if let Some(interval) = self.node_config().dev.block_time {
376            MiningMode::interval(interval)
377        } else {
378            MiningMode::instant(pool)
379        }
380    }
381}
382
383impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
384where
385    DB: Database + Clone + 'static,
386    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
387{
388    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
389    /// between the database and static files. **It may execute a pipeline unwind if it fails this
390    /// check.**
391    pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
392    where
393        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
394        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
395    {
396        let factory = ProviderFactory::new(
397            self.right().clone(),
398            self.chain_spec(),
399            StaticFileProvider::read_write(self.data_dir().static_files())?,
400        )
401        .with_prune_modes(self.prune_modes())
402        .with_static_files_metrics();
403
404        let has_receipt_pruning =
405            self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
406
407        // Check for consistency between database and static files. If it fails, it unwinds to
408        // the first block that's consistent between database and static files.
409        if let Some(unwind_target) = factory
410            .static_file_provider()
411            .check_consistency(&factory.provider()?, has_receipt_pruning)?
412        {
413            // Highly unlikely to happen, and given its destructive nature, it's better to panic
414            // instead.
415            assert_ne!(
416                unwind_target,
417                PipelineTarget::Unwind(0),
418                "A static file <> database inconsistency was found that would trigger an unwind to block 0"
419            );
420
421            info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
422
423            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
424
425            // Builds an unwind-only pipeline
426            let pipeline = PipelineBuilder::default()
427                .add_stages(DefaultStages::new(
428                    factory.clone(),
429                    tip_rx,
430                    Arc::new(NoopConsensus::default()),
431                    NoopHeaderDownloader::default(),
432                    NoopBodiesDownloader::default(),
433                    NoopEvmConfig::<Evm>::default(),
434                    self.toml_config().stages.clone(),
435                    self.prune_modes(),
436                    None,
437                ))
438                .build(
439                    factory.clone(),
440                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
441                );
442
443            // Unwinds to block
444            let (tx, rx) = oneshot::channel();
445
446            // Pipeline should be run as blocking and panic if it fails.
447            self.task_executor().spawn_critical_blocking(
448                "pipeline task",
449                Box::pin(async move {
450                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
451                    let _ = tx.send(result);
452                }),
453            );
454            rx.await?.inspect_err(|err| {
455                error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
456            })?;
457        }
458
459        Ok(factory)
460    }
461
462    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
463    pub async fn with_provider_factory<N, Evm>(
464        self,
465    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
466    where
467        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
468        Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
469    {
470        let factory = self.create_provider_factory::<N, Evm>().await?;
471        let ctx = LaunchContextWith {
472            inner: self.inner,
473            attachment: self.attachment.map_right(|_| factory),
474        };
475
476        Ok(ctx)
477    }
478}
479
480impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
481where
482    T: ProviderNodeTypes,
483{
484    /// Returns access to the underlying database.
485    pub const fn database(&self) -> &T::DB {
486        self.right().db_ref()
487    }
488
489    /// Returns the configured `ProviderFactory`.
490    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
491        self.right()
492    }
493
494    /// Returns the static file provider to interact with the static files.
495    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
496        self.right().static_file_provider()
497    }
498
499    /// This launches the prometheus endpoint.
500    ///
501    /// Convenience function to [`Self::start_prometheus_endpoint`]
502    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
503        self.start_prometheus_endpoint().await?;
504        Ok(self)
505    }
506
507    /// Starts the prometheus endpoint.
508    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
509        // ensure recorder runs upkeep periodically
510        install_prometheus_recorder().spawn_upkeep();
511
512        let listen_addr = self.node_config().metrics;
513        if let Some(addr) = listen_addr {
514            info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
515            let config = MetricServerConfig::new(
516                addr,
517                VersionInfo {
518                    version: CARGO_PKG_VERSION,
519                    build_timestamp: VERGEN_BUILD_TIMESTAMP,
520                    cargo_features: VERGEN_CARGO_FEATURES,
521                    git_sha: VERGEN_GIT_SHA,
522                    target_triple: VERGEN_CARGO_TARGET_TRIPLE,
523                    build_profile: BUILD_PROFILE_NAME,
524                },
525                ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
526                self.task_executor().clone(),
527                Hooks::builder()
528                    .with_hook({
529                        let db = self.database().clone();
530                        move || db.report_metrics()
531                    })
532                    .with_hook({
533                        let sfp = self.static_file_provider();
534                        move || {
535                            if let Err(error) = sfp.report_metrics() {
536                                error!(%error, "Failed to report metrics for the static file provider");
537                            }
538                        }
539                    })
540                    .build(),
541            );
542
543            MetricServer::new(config).serve().await?;
544        }
545
546        Ok(())
547    }
548
549    /// Convenience function to [`Self::init_genesis`]
550    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
551        init_genesis(self.provider_factory())?;
552        Ok(self)
553    }
554
555    /// Write the genesis block and state if it has not already been written
556    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
557        init_genesis(self.provider_factory())
558    }
559
560    /// Creates a new `WithMeteredProvider` container and attaches it to the
561    /// launch context.
562    ///
563    /// This spawns a metrics task that listens for metrics related events and updates metrics for
564    /// prometheus.
565    pub fn with_metrics_task(
566        self,
567    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
568        let (metrics_sender, metrics_receiver) = unbounded_channel();
569
570        let with_metrics =
571            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
572
573        debug!(target: "reth::cli", "Spawning stages metrics listener task");
574        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
575        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
576
577        LaunchContextWith {
578            inner: self.inner,
579            attachment: self.attachment.map_right(|_| with_metrics),
580        }
581    }
582}
583
584impl<N, DB>
585    LaunchContextWith<
586        Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
587    >
588where
589    N: NodeTypes,
590    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
591{
592    /// Returns the configured `ProviderFactory`.
593    const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
594        &self.right().provider_factory
595    }
596
597    /// Returns the metrics sender.
598    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
599        self.right().metrics_sender.clone()
600    }
601
602    /// Creates a `BlockchainProvider` and attaches it to the launch context.
603    #[expect(clippy::complexity)]
604    pub fn with_blockchain_db<T, F>(
605        self,
606        create_blockchain_provider: F,
607    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
608    where
609        T: FullNodeTypes<Types = N, DB = DB>,
610        F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
611    {
612        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
613
614        let metered_providers = WithMeteredProviders {
615            db_provider_container: WithMeteredProvider {
616                provider_factory: self.provider_factory().clone(),
617                metrics_sender: self.sync_metrics_tx(),
618            },
619            blockchain_db,
620        };
621
622        let ctx = LaunchContextWith {
623            inner: self.inner,
624            attachment: self.attachment.map_right(|_| metered_providers),
625        };
626
627        Ok(ctx)
628    }
629}
630
631impl<T>
632    LaunchContextWith<
633        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
634    >
635where
636    T: FullNodeTypes<Types: NodeTypesForProvider>,
637{
638    /// Returns access to the underlying database.
639    pub const fn database(&self) -> &T::DB {
640        self.provider_factory().db_ref()
641    }
642
643    /// Returns the configured `ProviderFactory`.
644    pub const fn provider_factory(
645        &self,
646    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
647        &self.right().db_provider_container.provider_factory
648    }
649
650    /// Fetches the head block from the database.
651    ///
652    /// If the database is empty, returns the genesis block.
653    pub fn lookup_head(&self) -> eyre::Result<Head> {
654        self.node_config()
655            .lookup_head(self.provider_factory())
656            .wrap_err("the head block is missing")
657    }
658
659    /// Returns the metrics sender.
660    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
661        self.right().db_provider_container.metrics_sender.clone()
662    }
663
664    /// Returns a reference to the blockchain provider.
665    pub const fn blockchain_db(&self) -> &T::Provider {
666        &self.right().blockchain_db
667    }
668
669    /// Creates a `NodeAdapter` and attaches it to the launch context.
670    pub async fn with_components<CB>(
671        self,
672        components_builder: CB,
673        on_component_initialized: Box<
674            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
675        >,
676    ) -> eyre::Result<
677        LaunchContextWith<
678            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
679        >,
680    >
681    where
682        CB: NodeComponentsBuilder<T>,
683    {
684        // fetch the head block from the database
685        let head = self.lookup_head()?;
686
687        let builder_ctx = BuilderContext::new(
688            head,
689            self.blockchain_db().clone(),
690            self.task_executor().clone(),
691            self.configs().clone(),
692        );
693
694        debug!(target: "reth::cli", "creating components");
695        let components = components_builder.build_components(&builder_ctx).await?;
696
697        let blockchain_db = self.blockchain_db().clone();
698
699        let node_adapter = NodeAdapter {
700            components,
701            task_executor: self.task_executor().clone(),
702            provider: blockchain_db,
703        };
704
705        debug!(target: "reth::cli", "calling on_component_initialized hook");
706        on_component_initialized.on_event(node_adapter.clone())?;
707
708        let components_container = WithComponents {
709            db_provider_container: WithMeteredProvider {
710                provider_factory: self.provider_factory().clone(),
711                metrics_sender: self.sync_metrics_tx(),
712            },
713            node_adapter,
714            head,
715        };
716
717        let ctx = LaunchContextWith {
718            inner: self.inner,
719            attachment: self.attachment.map_right(|_| components_container),
720        };
721
722        Ok(ctx)
723    }
724}
725
726impl<T, CB>
727    LaunchContextWith<
728        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
729    >
730where
731    T: FullNodeTypes<Types: NodeTypesForProvider>,
732    CB: NodeComponentsBuilder<T>,
733{
734    /// Returns the configured `ProviderFactory`.
735    pub const fn provider_factory(
736        &self,
737    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
738        &self.right().db_provider_container.provider_factory
739    }
740
741    /// Returns the max block that the node should run to, looking it up from the network if
742    /// necessary
743    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
744    where
745        C: HeadersClient<Header: BlockHeader>,
746    {
747        self.node_config().max_block(client, self.provider_factory().clone()).await
748    }
749
750    /// Returns the static file provider to interact with the static files.
751    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
752        self.provider_factory().static_file_provider()
753    }
754
755    /// Creates a new [`StaticFileProducer`] with the attached database.
756    pub fn static_file_producer(
757        &self,
758    ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
759        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
760    }
761
762    /// Returns the current head block.
763    pub const fn head(&self) -> Head {
764        self.right().head
765    }
766
767    /// Returns the configured `NodeAdapter`.
768    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
769        &self.right().node_adapter
770    }
771
772    /// Returns mutable reference to the configured `NodeAdapter`.
773    pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
774        &mut self.right_mut().node_adapter
775    }
776
777    /// Returns a reference to the blockchain provider.
778    pub const fn blockchain_db(&self) -> &T::Provider {
779        &self.node_adapter().provider
780    }
781
782    /// Returns the initial backfill to sync to at launch.
783    ///
784    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
785    /// previously interrupted and returns the block hash of the last checkpoint, see also
786    /// [`Self::check_pipeline_consistency`]
787    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
788        let mut initial_target = self.node_config().debug.tip;
789
790        if initial_target.is_none() {
791            initial_target = self.check_pipeline_consistency()?;
792        }
793
794        Ok(initial_target)
795    }
796
797    /// Returns true if the node should terminate after the initial backfill run.
798    ///
799    /// This is the case if any of these configs are set:
800    ///  `--debug.max-block`
801    ///  `--debug.terminate`
802    pub const fn terminate_after_initial_backfill(&self) -> bool {
803        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
804    }
805
806    /// Ensures that the database matches chain-specific requirements.
807    ///
808    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
809    /// bedrock height)
810    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
811        if self.chain_spec().is_optimism() &&
812            !self.is_dev() &&
813            self.chain_id() == Chain::optimism_mainnet()
814        {
815            let latest = self.blockchain_db().last_block_number()?;
816            // bedrock height
817            if latest < 105235063 {
818                error!(
819                    "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
820                );
821                return Err(ProviderError::BestBlockNotFound)
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
829    /// than the checkpoint of the first stage).
830    ///
831    /// This will return the pipeline target if:
832    ///  * the pipeline was interrupted during its previous run
833    ///  * a new stage was added
834    ///  * stage data was dropped manually through `reth stage drop ...`
835    ///
836    /// # Returns
837    ///
838    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
839    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
840        // If no target was provided, check if the stages are congruent - check if the
841        // checkpoint of the last stage matches the checkpoint of the first.
842        let first_stage_checkpoint = self
843            .blockchain_db()
844            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
845            .unwrap_or_default()
846            .block_number;
847
848        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
849        // against it.
850        for stage_id in StageId::ALL.iter().skip(1) {
851            let stage_checkpoint = self
852                .blockchain_db()
853                .get_stage_checkpoint(*stage_id)?
854                .unwrap_or_default()
855                .block_number;
856
857            // If the checkpoint of any stage is less than the checkpoint of the first stage,
858            // retrieve and return the block hash of the latest header and use it as the target.
859            if stage_checkpoint < first_stage_checkpoint {
860                debug!(
861                    target: "consensus::engine",
862                    first_stage_checkpoint,
863                    inconsistent_stage_id = %stage_id,
864                    inconsistent_stage_checkpoint = stage_checkpoint,
865                    "Pipeline sync progress is inconsistent"
866                );
867                return self.blockchain_db().block_hash(first_stage_checkpoint);
868            }
869        }
870
871        self.ensure_chain_specific_db_checks()?;
872
873        Ok(None)
874    }
875
876    /// Returns the metrics sender.
877    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
878        self.right().db_provider_container.metrics_sender.clone()
879    }
880
881    /// Returns the node adapter components.
882    pub const fn components(&self) -> &CB::Components {
883        &self.node_adapter().components
884    }
885}
886
887impl<T, CB>
888    LaunchContextWith<
889        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
890    >
891where
892    T: FullNodeTypes<
893        Provider: StateProviderFactory + ChainSpecProvider,
894        Types: NodeTypesForProvider,
895    >,
896    CB: NodeComponentsBuilder<T>,
897{
898    /// Returns the [`InvalidBlockHook`] to use for the node.
899    pub fn invalid_block_hook(
900        &self,
901    ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
902        let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
903            return Ok(Box::new(NoopInvalidBlockHook::default()))
904        };
905        let healthy_node_rpc_client = self.get_healthy_node_client()?;
906
907        let output_directory = self.data_dir().invalid_block_hooks();
908        let hooks = hook
909            .iter()
910            .copied()
911            .map(|hook| {
912                let output_directory = output_directory.join(hook.to_string());
913                fs::create_dir_all(&output_directory)?;
914
915                Ok(match hook {
916                    InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
917                        self.blockchain_db().clone(),
918                        self.components().evm_config().clone(),
919                        output_directory,
920                        healthy_node_rpc_client.clone(),
921                    )),
922                    InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
923                        eyre::bail!("invalid block hook {hook:?} is not implemented yet")
924                    }
925                } as Box<dyn InvalidBlockHook<_>>)
926            })
927            .collect::<Result<_, _>>()?;
928
929        Ok(Box::new(InvalidBlockHooks(hooks)))
930    }
931
932    /// Returns an RPC client for the healthy node, if configured in the node config.
933    fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
934        self.node_config()
935            .debug
936            .healthy_node_rpc_url
937            .as_ref()
938            .map(|url| {
939                let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
940
941                // Verify that the healthy node is running the same chain as the current node.
942                let chain_id = futures::executor::block_on(async {
943                    EthApiClient::<
944                        alloy_rpc_types::Transaction,
945                        alloy_rpc_types::Block,
946                        alloy_rpc_types::Receipt,
947                        alloy_rpc_types::Header,
948                    >::chain_id(&client)
949                    .await
950                })?
951                .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
952                if chain_id.to::<u64>() != self.chain_id().id() {
953                    eyre::bail!("invalid chain id for healthy node: {chain_id}")
954                }
955
956                Ok(client)
957            })
958            .transpose()
959    }
960}
961
962/// Joins two attachments together.
963#[derive(Clone, Copy, Debug)]
964pub struct Attached<L, R> {
965    left: L,
966    right: R,
967}
968
969impl<L, R> Attached<L, R> {
970    /// Creates a new `Attached` with the given values.
971    pub const fn new(left: L, right: R) -> Self {
972        Self { left, right }
973    }
974
975    /// Maps the left value to a new value.
976    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
977    where
978        F: FnOnce(L) -> T,
979    {
980        Attached::new(f(self.left), self.right)
981    }
982
983    /// Maps the right value to a new value.
984    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
985    where
986        F: FnOnce(R) -> T,
987    {
988        Attached::new(self.left, f(self.right))
989    }
990
991    /// Get a reference to the left value.
992    pub const fn left(&self) -> &L {
993        &self.left
994    }
995
996    /// Get a reference to the right value.
997    pub const fn right(&self) -> &R {
998        &self.right
999    }
1000
1001    /// Get a mutable reference to the right value.
1002    pub const fn left_mut(&mut self) -> &mut R {
1003        &mut self.right
1004    }
1005
1006    /// Get a mutable reference to the right value.
1007    pub const fn right_mut(&mut self) -> &mut R {
1008        &mut self.right
1009    }
1010}
1011
1012/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
1013/// reth.toml config
1014#[derive(Debug)]
1015pub struct WithConfigs<ChainSpec> {
1016    /// The configured, usually derived from the CLI.
1017    pub config: NodeConfig<ChainSpec>,
1018    /// The loaded reth.toml config.
1019    pub toml_config: reth_config::Config,
1020}
1021
1022impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1023    fn clone(&self) -> Self {
1024        Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1025    }
1026}
1027
1028/// Helper container type to bundle the [`ProviderFactory`] and the metrics
1029/// sender.
1030#[derive(Debug, Clone)]
1031pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1032    provider_factory: ProviderFactory<N>,
1033    metrics_sender: UnboundedSender<MetricEvent>,
1034}
1035
1036/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1037/// and a metrics sender.
1038#[expect(missing_debug_implementations)]
1039pub struct WithMeteredProviders<T>
1040where
1041    T: FullNodeTypes,
1042{
1043    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1044    blockchain_db: T::Provider,
1045}
1046
1047/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1048#[expect(missing_debug_implementations)]
1049pub struct WithComponents<T, CB>
1050where
1051    T: FullNodeTypes,
1052    CB: NodeComponentsBuilder<T>,
1053{
1054    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1055    node_adapter: NodeAdapter<T, CB::Components>,
1056    head: Head,
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use super::{LaunchContext, NodeConfig};
1062    use reth_config::Config;
1063    use reth_node_core::args::PruningArgs;
1064
1065    const EXTENSION: &str = "toml";
1066
1067    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1068        let temp_dir = tempfile::tempdir().unwrap();
1069        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1070        proc(&config_path);
1071        temp_dir.close().unwrap()
1072    }
1073
1074    #[test]
1075    fn test_save_prune_config() {
1076        with_tempdir("prune-store-test", |config_path| {
1077            let mut reth_config = Config::default();
1078            let node_config = NodeConfig {
1079                pruning: PruningArgs {
1080                    full: true,
1081                    block_interval: None,
1082                    sender_recovery_full: false,
1083                    sender_recovery_distance: None,
1084                    sender_recovery_before: None,
1085                    transaction_lookup_full: false,
1086                    transaction_lookup_distance: None,
1087                    transaction_lookup_before: None,
1088                    receipts_full: false,
1089                    receipts_distance: None,
1090                    receipts_before: None,
1091                    account_history_full: false,
1092                    account_history_distance: None,
1093                    account_history_before: None,
1094                    storage_history_full: false,
1095                    storage_history_distance: None,
1096                    storage_history_before: None,
1097                    receipts_log_filter: None,
1098                },
1099                ..NodeConfig::test()
1100            };
1101            LaunchContext::save_pruning_config_if_full_node(
1102                &mut reth_config,
1103                &node_config,
1104                config_path,
1105            )
1106            .unwrap();
1107
1108            let loaded_config = Config::from_path(config_path).unwrap();
1109
1110            assert_eq!(reth_config, loaded_config);
1111        })
1112    }
1113}