reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2
3use std::{sync::Arc, thread::available_parallelism};
4
5use crate::{
6    components::{NodeComponents, NodeComponentsBuilder},
7    hooks::OnComponentInitializedHook,
8    BuilderContext, NodeAdapter,
9};
10use alloy_primitives::{BlockNumber, B256};
11use eyre::{Context, OptionExt};
12use rayon::ThreadPoolBuilder;
13use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
14use reth_config::{config::EtlConfig, PruneConfig};
15use reth_consensus::noop::NoopConsensus;
16use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
17use reth_db_common::init::{init_genesis, InitStorageError};
18use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
19use reth_engine_local::MiningMode;
20use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
21use reth_evm::noop::NoopBlockExecutorProvider;
22use reth_fs_util as fs;
23use reth_invalid_block_hooks::InvalidBlockWitnessHook;
24use reth_network_p2p::headers::client::HeadersClient;
25use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
26use reth_node_core::{
27    args::InvalidBlockHookType,
28    dirs::{ChainPath, DataDirPath},
29    node_config::NodeConfig,
30    primitives::BlockHeader,
31    version::{
32        BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
33        VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
34    },
35};
36use reth_node_metrics::{
37    chain::ChainSpecInfo,
38    hooks::Hooks,
39    recorder::install_prometheus_recorder,
40    server::{MetricServer, MetricServerConfig},
41    version::VersionInfo,
42};
43use reth_primitives::Head;
44use reth_provider::{
45    providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
46    BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
47    ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
48};
49use reth_prune::{PruneModes, PrunerBuilder};
50use reth_rpc_api::clients::EthApiClient;
51use reth_rpc_builder::config::RethRpcServerConfig;
52use reth_rpc_layer::JwtSecret;
53use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
54use reth_static_file::StaticFileProducer;
55use reth_tasks::TaskExecutor;
56use reth_tracing::tracing::{debug, error, info, warn};
57use reth_transaction_pool::TransactionPool;
58use tokio::sync::{
59    mpsc::{unbounded_channel, UnboundedSender},
60    oneshot, watch,
61};
62
63/// Reusable setup for launching a node.
64///
65/// This provides commonly used boilerplate for launching a node.
66#[derive(Debug, Clone)]
67pub struct LaunchContext {
68    /// The task executor for the node.
69    pub task_executor: TaskExecutor,
70    /// The data directory for the node.
71    pub data_dir: ChainPath<DataDirPath>,
72}
73
74impl LaunchContext {
75    /// Create a new instance of the default node launcher.
76    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
77        Self { task_executor, data_dir }
78    }
79
80    /// Create launch context with attachment.
81    pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
82        LaunchContextWith { inner: self, attachment }
83    }
84
85    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
86    /// `config`.
87    ///
88    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
89    pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
90        self,
91        config: NodeConfig<ChainSpec>,
92    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
93        let toml_config = self.load_toml_config(&config)?;
94        Ok(self.with(WithConfigs { config, toml_config }))
95    }
96
97    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
98    /// `config`.
99    ///
100    /// This is async because the trusted peers may have to be resolved.
101    pub fn load_toml_config<ChainSpec: EthChainSpec>(
102        &self,
103        config: &NodeConfig<ChainSpec>,
104    ) -> eyre::Result<reth_config::Config> {
105        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
106
107        let mut toml_config = reth_config::Config::from_path(&config_path)
108            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
109
110        Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
111
112        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
113
114        // Update the config with the command line arguments
115        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
116
117        Ok(toml_config)
118    }
119
120    /// Save prune config to the toml file if node is a full node.
121    fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
122        reth_config: &mut reth_config::Config,
123        config: &NodeConfig<ChainSpec>,
124        config_path: impl AsRef<std::path::Path>,
125    ) -> eyre::Result<()> {
126        if reth_config.prune.is_none() {
127            if let Some(prune_config) = config.prune_config() {
128                reth_config.update_prune_config(prune_config);
129                info!(target: "reth::cli", "Saving prune config to toml file");
130                reth_config.save(config_path.as_ref())?;
131            }
132        } else if config.prune_config().is_none() {
133            warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
134        }
135        Ok(())
136    }
137
138    /// Convenience function to [`Self::configure_globals`]
139    pub fn with_configured_globals(self) -> Self {
140        self.configure_globals();
141        self
142    }
143
144    /// Configure global settings this includes:
145    ///
146    /// - Raising the file descriptor limit
147    /// - Configuring the global rayon thread pool
148    pub fn configure_globals(&self) {
149        // Raise the fd limit of the process.
150        // Does not do anything on windows.
151        match fdlimit::raise_fd_limit() {
152            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
153                debug!(from, to, "Raised file descriptor limit");
154            }
155            Ok(fdlimit::Outcome::Unsupported) => {}
156            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
157        }
158
159        // Limit the global rayon thread pool, reserving 1 core for the rest of the system.
160        // If the system only has 1 core the pool will use it.
161        let num_threads =
162            available_parallelism().map_or(0, |num| num.get().saturating_sub(1).max(1));
163        if let Err(err) = ThreadPoolBuilder::new()
164            .num_threads(num_threads)
165            .thread_name(|i| format!("reth-rayon-{i}"))
166            .build_global()
167        {
168            error!(%err, "Failed to build global thread pool")
169        }
170    }
171}
172
173/// A [`LaunchContext`] along with an additional value.
174///
175/// This can be used to sequentially attach additional values to the type during the launch process.
176///
177/// The type provides common boilerplate for launching a node depending on the additional value.
178#[derive(Debug, Clone)]
179pub struct LaunchContextWith<T> {
180    /// The wrapped launch context.
181    pub inner: LaunchContext,
182    /// The additional attached value.
183    pub attachment: T,
184}
185
186impl<T> LaunchContextWith<T> {
187    /// Configure global settings this includes:
188    ///
189    /// - Raising the file descriptor limit
190    /// - Configuring the global rayon thread pool
191    pub fn configure_globals(&self) {
192        self.inner.configure_globals();
193    }
194
195    /// Returns the data directory.
196    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
197        &self.inner.data_dir
198    }
199
200    /// Returns the task executor.
201    pub const fn task_executor(&self) -> &TaskExecutor {
202        &self.inner.task_executor
203    }
204
205    /// Attaches another value to the launch context.
206    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
207        LaunchContextWith {
208            inner: self.inner,
209            attachment: Attached::new(self.attachment, attachment),
210        }
211    }
212
213    /// Consumes the type and calls a function with a reference to the context.
214    // Returns the context again
215    pub fn inspect<F>(self, f: F) -> Self
216    where
217        F: FnOnce(&Self),
218    {
219        f(&self);
220        self
221    }
222}
223
224impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
225    /// Resolves the trusted peers and adds them to the toml config.
226    pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
227        if !self.attachment.config.network.trusted_peers.is_empty() {
228            info!(target: "reth::cli", "Adding trusted nodes");
229
230            self.attachment
231                .toml_config
232                .peers
233                .trusted_nodes
234                .extend(self.attachment.config.network.trusted_peers.clone());
235        }
236        Ok(self)
237    }
238}
239
240impl<L, R> LaunchContextWith<Attached<L, R>> {
241    /// Get a reference to the left value.
242    pub const fn left(&self) -> &L {
243        &self.attachment.left
244    }
245
246    /// Get a reference to the right value.
247    pub const fn right(&self) -> &R {
248        &self.attachment.right
249    }
250
251    /// Get a mutable reference to the right value.
252    pub fn left_mut(&mut self) -> &mut L {
253        &mut self.attachment.left
254    }
255
256    /// Get a mutable reference to the right value.
257    pub fn right_mut(&mut self) -> &mut R {
258        &mut self.attachment.right
259    }
260}
261impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
262    /// Adjust certain settings in the config to make sure they are set correctly
263    ///
264    /// This includes:
265    /// - Making sure the ETL dir is set to the datadir
266    /// - RPC settings are adjusted to the correct port
267    pub fn with_adjusted_configs(self) -> Self {
268        self.ensure_etl_datadir().with_adjusted_instance_ports()
269    }
270
271    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
272    pub fn ensure_etl_datadir(mut self) -> Self {
273        if self.toml_config_mut().stages.etl.dir.is_none() {
274            self.toml_config_mut().stages.etl.dir =
275                Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
276        }
277
278        self
279    }
280
281    /// Change rpc port numbers based on the instance number.
282    pub fn with_adjusted_instance_ports(mut self) -> Self {
283        self.node_config_mut().adjust_instance_ports();
284        self
285    }
286
287    /// Returns the container for all config types
288    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
289        self.attachment.left()
290    }
291
292    /// Returns the attached [`NodeConfig`].
293    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
294        &self.left().config
295    }
296
297    /// Returns the attached [`NodeConfig`].
298    pub fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
299        &mut self.left_mut().config
300    }
301
302    /// Returns the attached toml config [`reth_config::Config`].
303    pub const fn toml_config(&self) -> &reth_config::Config {
304        &self.left().toml_config
305    }
306
307    /// Returns the attached toml config [`reth_config::Config`].
308    pub fn toml_config_mut(&mut self) -> &mut reth_config::Config {
309        &mut self.left_mut().toml_config
310    }
311
312    /// Returns the configured chain spec.
313    pub fn chain_spec(&self) -> Arc<ChainSpec> {
314        self.node_config().chain.clone()
315    }
316
317    /// Get the hash of the genesis block.
318    pub fn genesis_hash(&self) -> B256 {
319        self.node_config().chain.genesis_hash()
320    }
321
322    /// Returns the chain identifier of the node.
323    pub fn chain_id(&self) -> Chain {
324        self.node_config().chain.chain()
325    }
326
327    /// Returns true if the node is configured as --dev
328    pub const fn is_dev(&self) -> bool {
329        self.node_config().dev.dev
330    }
331
332    /// Returns the configured [`PruneConfig`]
333    /// Any configuration set in CLI will take precedence over those set in toml
334    pub fn prune_config(&self) -> Option<PruneConfig> {
335        let Some(mut node_prune_config) = self.node_config().prune_config() else {
336            // No CLI config is set, use the toml config.
337            return self.toml_config().prune.clone();
338        };
339
340        // Otherwise, use the CLI configuration and merge with toml config.
341        node_prune_config.merge(self.toml_config().prune.clone());
342        Some(node_prune_config)
343    }
344
345    /// Returns the configured [`PruneModes`], returning the default if no config was available.
346    pub fn prune_modes(&self) -> PruneModes {
347        self.prune_config().map(|config| config.segments).unwrap_or_default()
348    }
349
350    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
351    pub fn pruner_builder(&self) -> PrunerBuilder {
352        PrunerBuilder::new(self.prune_config().unwrap_or_default())
353            .delete_limit(self.chain_spec().prune_delete_limit())
354            .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
355    }
356
357    /// Loads the JWT secret for the engine API
358    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
359        let default_jwt_path = self.data_dir().jwt();
360        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
361        Ok(secret)
362    }
363
364    /// Returns the [`MiningMode`] intended for --dev mode.
365    pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
366        if let Some(interval) = self.node_config().dev.block_time {
367            MiningMode::interval(interval)
368        } else {
369            MiningMode::instant(pool)
370        }
371    }
372}
373
374impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
375where
376    DB: Database + Clone + 'static,
377    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
378{
379    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
380    /// between the database and static files. **It may execute a pipeline unwind if it fails this
381    /// check.**
382    pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
383    where
384        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
385    {
386        let factory = ProviderFactory::new(
387            self.right().clone(),
388            self.chain_spec(),
389            StaticFileProvider::read_write(self.data_dir().static_files())?,
390        )
391        .with_prune_modes(self.prune_modes())
392        .with_static_files_metrics();
393
394        let has_receipt_pruning =
395            self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
396
397        // Check for consistency between database and static files. If it fails, it unwinds to
398        // the first block that's consistent between database and static files.
399        if let Some(unwind_target) = factory
400            .static_file_provider()
401            .check_consistency(&factory.provider()?, has_receipt_pruning)?
402        {
403            // Highly unlikely to happen, and given its destructive nature, it's better to panic
404            // instead.
405            assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
406
407            info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
408
409            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
410
411            // Builds an unwind-only pipeline
412            let pipeline = PipelineBuilder::default()
413                .add_stages(DefaultStages::new(
414                    factory.clone(),
415                    tip_rx,
416                    Arc::new(NoopConsensus::default()),
417                    NoopHeaderDownloader::default(),
418                    NoopBodiesDownloader::default(),
419                    NoopBlockExecutorProvider::<N::Primitives>::default(),
420                    self.toml_config().stages.clone(),
421                    self.prune_modes(),
422                ))
423                .build(
424                    factory.clone(),
425                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
426                );
427
428            // Unwinds to block
429            let (tx, rx) = oneshot::channel();
430
431            // Pipeline should be run as blocking and panic if it fails.
432            self.task_executor().spawn_critical_blocking(
433                "pipeline task",
434                Box::pin(async move {
435                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
436                    let _ = tx.send(result);
437                }),
438            );
439            rx.await??;
440        }
441
442        Ok(factory)
443    }
444
445    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
446    pub async fn with_provider_factory<N>(
447        self,
448    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
449    where
450        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
451    {
452        let factory = self.create_provider_factory().await?;
453        let ctx = LaunchContextWith {
454            inner: self.inner,
455            attachment: self.attachment.map_right(|_| factory),
456        };
457
458        Ok(ctx)
459    }
460}
461
462impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
463where
464    T: ProviderNodeTypes,
465{
466    /// Returns access to the underlying database.
467    pub const fn database(&self) -> &T::DB {
468        self.right().db_ref()
469    }
470
471    /// Returns the configured `ProviderFactory`.
472    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
473        self.right()
474    }
475
476    /// Returns the static file provider to interact with the static files.
477    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
478        self.right().static_file_provider()
479    }
480
481    /// This launches the prometheus endpoint.
482    ///
483    /// Convenience function to [`Self::start_prometheus_endpoint`]
484    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
485        self.start_prometheus_endpoint().await?;
486        Ok(self)
487    }
488
489    /// Starts the prometheus endpoint.
490    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
491        // ensure recorder runs upkeep periodically
492        install_prometheus_recorder().spawn_upkeep();
493
494        let listen_addr = self.node_config().metrics;
495        if let Some(addr) = listen_addr {
496            info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
497            let config = MetricServerConfig::new(
498                addr,
499                VersionInfo {
500                    version: CARGO_PKG_VERSION,
501                    build_timestamp: VERGEN_BUILD_TIMESTAMP,
502                    cargo_features: VERGEN_CARGO_FEATURES,
503                    git_sha: VERGEN_GIT_SHA,
504                    target_triple: VERGEN_CARGO_TARGET_TRIPLE,
505                    build_profile: BUILD_PROFILE_NAME,
506                },
507                ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
508                self.task_executor().clone(),
509                Hooks::builder()
510                    .with_hook({
511                        let db = self.database().clone();
512                        move || db.report_metrics()
513                    })
514                    .with_hook({
515                        let sfp = self.static_file_provider();
516                        move || {
517                            if let Err(error) = sfp.report_metrics() {
518                                error!(%error, "Failed to report metrics for the static file provider");
519                            }
520                        }
521                    })
522                    .build(),
523            );
524
525            MetricServer::new(config).serve().await?;
526        }
527
528        Ok(())
529    }
530
531    /// Convenience function to [`Self::init_genesis`]
532    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
533        init_genesis(self.provider_factory())?;
534        Ok(self)
535    }
536
537    /// Write the genesis block and state if it has not already been written
538    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
539        init_genesis(self.provider_factory())
540    }
541
542    /// Creates a new `WithMeteredProvider` container and attaches it to the
543    /// launch context.
544    ///
545    /// This spawns a metrics task that listens for metrics related events and updates metrics for
546    /// prometheus.
547    pub fn with_metrics_task(
548        self,
549    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
550        let (metrics_sender, metrics_receiver) = unbounded_channel();
551
552        let with_metrics =
553            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
554
555        debug!(target: "reth::cli", "Spawning stages metrics listener task");
556        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
557        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
558
559        LaunchContextWith {
560            inner: self.inner,
561            attachment: self.attachment.map_right(|_| with_metrics),
562        }
563    }
564}
565
566impl<N, DB>
567    LaunchContextWith<
568        Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
569    >
570where
571    N: NodeTypes,
572    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
573{
574    /// Returns the configured `ProviderFactory`.
575    const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
576        &self.right().provider_factory
577    }
578
579    /// Returns the metrics sender.
580    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
581        self.right().metrics_sender.clone()
582    }
583
584    /// Creates a `BlockchainProvider` and attaches it to the launch context.
585    #[allow(clippy::complexity)]
586    pub fn with_blockchain_db<T, F>(
587        self,
588        create_blockchain_provider: F,
589    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
590    where
591        T: FullNodeTypes<Types = N, DB = DB>,
592        F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
593    {
594        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
595
596        let metered_providers = WithMeteredProviders {
597            db_provider_container: WithMeteredProvider {
598                provider_factory: self.provider_factory().clone(),
599                metrics_sender: self.sync_metrics_tx(),
600            },
601            blockchain_db,
602        };
603
604        let ctx = LaunchContextWith {
605            inner: self.inner,
606            attachment: self.attachment.map_right(|_| metered_providers),
607        };
608
609        Ok(ctx)
610    }
611}
612
613impl<T>
614    LaunchContextWith<
615        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
616    >
617where
618    T: FullNodeTypes<Types: NodeTypesForProvider>,
619{
620    /// Returns access to the underlying database.
621    pub const fn database(&self) -> &T::DB {
622        self.provider_factory().db_ref()
623    }
624
625    /// Returns the configured `ProviderFactory`.
626    pub const fn provider_factory(
627        &self,
628    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
629        &self.right().db_provider_container.provider_factory
630    }
631
632    /// Fetches the head block from the database.
633    ///
634    /// If the database is empty, returns the genesis block.
635    pub fn lookup_head(&self) -> eyre::Result<Head> {
636        self.node_config()
637            .lookup_head(self.provider_factory())
638            .wrap_err("the head block is missing")
639    }
640
641    /// Returns the metrics sender.
642    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
643        self.right().db_provider_container.metrics_sender.clone()
644    }
645
646    /// Returns a reference to the blockchain provider.
647    pub const fn blockchain_db(&self) -> &T::Provider {
648        &self.right().blockchain_db
649    }
650
651    /// Creates a `NodeAdapter` and attaches it to the launch context.
652    pub async fn with_components<CB>(
653        self,
654        components_builder: CB,
655        on_component_initialized: Box<
656            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
657        >,
658    ) -> eyre::Result<
659        LaunchContextWith<
660            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
661        >,
662    >
663    where
664        CB: NodeComponentsBuilder<T>,
665    {
666        // fetch the head block from the database
667        let head = self.lookup_head()?;
668
669        let builder_ctx = BuilderContext::new(
670            head,
671            self.blockchain_db().clone(),
672            self.task_executor().clone(),
673            self.configs().clone(),
674        );
675
676        debug!(target: "reth::cli", "creating components");
677        let components = components_builder.build_components(&builder_ctx).await?;
678
679        let blockchain_db = self.blockchain_db().clone();
680
681        let node_adapter = NodeAdapter {
682            components,
683            task_executor: self.task_executor().clone(),
684            provider: blockchain_db,
685        };
686
687        debug!(target: "reth::cli", "calling on_component_initialized hook");
688        on_component_initialized.on_event(node_adapter.clone())?;
689
690        let components_container = WithComponents {
691            db_provider_container: WithMeteredProvider {
692                provider_factory: self.provider_factory().clone(),
693                metrics_sender: self.sync_metrics_tx(),
694            },
695            node_adapter,
696            head,
697        };
698
699        let ctx = LaunchContextWith {
700            inner: self.inner,
701            attachment: self.attachment.map_right(|_| components_container),
702        };
703
704        Ok(ctx)
705    }
706}
707
708impl<T, CB>
709    LaunchContextWith<
710        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
711    >
712where
713    T: FullNodeTypes<Types: NodeTypesForProvider>,
714    CB: NodeComponentsBuilder<T>,
715{
716    /// Returns the configured `ProviderFactory`.
717    pub const fn provider_factory(
718        &self,
719    ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
720        &self.right().db_provider_container.provider_factory
721    }
722
723    /// Returns the max block that the node should run to, looking it up from the network if
724    /// necessary
725    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
726    where
727        C: HeadersClient<Header: BlockHeader>,
728    {
729        self.node_config().max_block(client, self.provider_factory().clone()).await
730    }
731
732    /// Returns the static file provider to interact with the static files.
733    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
734        self.provider_factory().static_file_provider()
735    }
736
737    /// Creates a new [`StaticFileProducer`] with the attached database.
738    pub fn static_file_producer(
739        &self,
740    ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
741        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
742    }
743
744    /// Returns the current head block.
745    pub const fn head(&self) -> Head {
746        self.right().head
747    }
748
749    /// Returns the configured `NodeAdapter`.
750    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
751        &self.right().node_adapter
752    }
753
754    /// Returns mutable reference to the configured `NodeAdapter`.
755    pub fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
756        &mut self.right_mut().node_adapter
757    }
758
759    /// Returns a reference to the blockchain provider.
760    pub const fn blockchain_db(&self) -> &T::Provider {
761        &self.node_adapter().provider
762    }
763
764    /// Returns the initial backfill to sync to at launch.
765    ///
766    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
767    /// previously interrupted and returns the block hash of the last checkpoint, see also
768    /// [`Self::check_pipeline_consistency`]
769    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
770        let mut initial_target = self.node_config().debug.tip;
771
772        if initial_target.is_none() {
773            initial_target = self.check_pipeline_consistency()?;
774        }
775
776        Ok(initial_target)
777    }
778
779    /// Returns true if the node should terminate after the initial backfill run.
780    ///
781    /// This is the case if any of these configs are set:
782    ///  `--debug.max-block`
783    ///  `--debug.terminate`
784    pub const fn terminate_after_initial_backfill(&self) -> bool {
785        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
786    }
787
788    /// Ensures that the database matches chain-specific requirements.
789    ///
790    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
791    /// bedrock height)
792    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
793        if self.chain_spec().is_optimism() &&
794            !self.is_dev() &&
795            self.chain_id() == Chain::optimism_mainnet()
796        {
797            let latest = self.blockchain_db().last_block_number()?;
798            // bedrock height
799            if latest < 105235063 {
800                error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended");
801                return Err(ProviderError::BestBlockNotFound)
802            }
803        }
804
805        Ok(())
806    }
807
808    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
809    /// than the checkpoint of the first stage).
810    ///
811    /// This will return the pipeline target if:
812    ///  * the pipeline was interrupted during its previous run
813    ///  * a new stage was added
814    ///  * stage data was dropped manually through `reth stage drop ...`
815    ///
816    /// # Returns
817    ///
818    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
819    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
820        // If no target was provided, check if the stages are congruent - check if the
821        // checkpoint of the last stage matches the checkpoint of the first.
822        let first_stage_checkpoint = self
823            .blockchain_db()
824            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
825            .unwrap_or_default()
826            .block_number;
827
828        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
829        // against it.
830        for stage_id in StageId::ALL.iter().skip(1) {
831            let stage_checkpoint = self
832                .blockchain_db()
833                .get_stage_checkpoint(*stage_id)?
834                .unwrap_or_default()
835                .block_number;
836
837            // If the checkpoint of any stage is less than the checkpoint of the first stage,
838            // retrieve and return the block hash of the latest header and use it as the target.
839            if stage_checkpoint < first_stage_checkpoint {
840                debug!(
841                    target: "consensus::engine",
842                    first_stage_checkpoint,
843                    inconsistent_stage_id = %stage_id,
844                    inconsistent_stage_checkpoint = stage_checkpoint,
845                    "Pipeline sync progress is inconsistent"
846                );
847                return self.blockchain_db().block_hash(first_stage_checkpoint);
848            }
849        }
850
851        self.ensure_chain_specific_db_checks()?;
852
853        Ok(None)
854    }
855
856    /// Returns the metrics sender.
857    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
858        self.right().db_provider_container.metrics_sender.clone()
859    }
860
861    /// Returns the node adapter components.
862    pub const fn components(&self) -> &CB::Components {
863        &self.node_adapter().components
864    }
865}
866
867impl<T, CB>
868    LaunchContextWith<
869        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
870    >
871where
872    T: FullNodeTypes<
873        Provider: StateProviderFactory + ChainSpecProvider,
874        Types: NodeTypesForProvider,
875    >,
876    CB: NodeComponentsBuilder<T>,
877{
878    /// Returns the [`InvalidBlockHook`] to use for the node.
879    pub fn invalid_block_hook(
880        &self,
881    ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
882        let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
883            return Ok(Box::new(NoopInvalidBlockHook::default()))
884        };
885        let healthy_node_rpc_client = self.get_healthy_node_client()?;
886
887        let output_directory = self.data_dir().invalid_block_hooks();
888        let hooks = hook
889            .iter()
890            .copied()
891            .map(|hook| {
892                let output_directory = output_directory.join(hook.to_string());
893                fs::create_dir_all(&output_directory)?;
894
895                Ok(match hook {
896                    InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
897                        self.blockchain_db().clone(),
898                        self.components().block_executor().clone(),
899                        output_directory,
900                        healthy_node_rpc_client.clone(),
901                    )),
902                    InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
903                        eyre::bail!("invalid block hook {hook:?} is not implemented yet")
904                    }
905                } as Box<dyn InvalidBlockHook<_>>)
906            })
907            .collect::<Result<_, _>>()?;
908
909        Ok(Box::new(InvalidBlockHooks(hooks)))
910    }
911
912    /// Returns an RPC client for the healthy node, if configured in the node config.
913    fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
914        self.node_config()
915            .debug
916            .healthy_node_rpc_url
917            .as_ref()
918            .map(|url| {
919                let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
920
921                // Verify that the healthy node is running the same chain as the current node.
922                let chain_id = futures::executor::block_on(async {
923                    EthApiClient::<
924                        alloy_rpc_types::Transaction,
925                        alloy_rpc_types::Block,
926                        alloy_rpc_types::Receipt,
927                        alloy_rpc_types::Header,
928                    >::chain_id(&client)
929                    .await
930                })?
931                .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
932                if chain_id.to::<u64>() != self.chain_id().id() {
933                    eyre::bail!("invalid chain id for healthy node: {chain_id}")
934                }
935
936                Ok(client)
937            })
938            .transpose()
939    }
940}
941
942/// Joins two attachments together.
943#[derive(Clone, Copy, Debug)]
944pub struct Attached<L, R> {
945    left: L,
946    right: R,
947}
948
949impl<L, R> Attached<L, R> {
950    /// Creates a new `Attached` with the given values.
951    pub const fn new(left: L, right: R) -> Self {
952        Self { left, right }
953    }
954
955    /// Maps the left value to a new value.
956    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
957    where
958        F: FnOnce(L) -> T,
959    {
960        Attached::new(f(self.left), self.right)
961    }
962
963    /// Maps the right value to a new value.
964    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
965    where
966        F: FnOnce(R) -> T,
967    {
968        Attached::new(self.left, f(self.right))
969    }
970
971    /// Get a reference to the left value.
972    pub const fn left(&self) -> &L {
973        &self.left
974    }
975
976    /// Get a reference to the right value.
977    pub const fn right(&self) -> &R {
978        &self.right
979    }
980
981    /// Get a mutable reference to the right value.
982    pub fn left_mut(&mut self) -> &mut R {
983        &mut self.right
984    }
985
986    /// Get a mutable reference to the right value.
987    pub fn right_mut(&mut self) -> &mut R {
988        &mut self.right
989    }
990}
991
992/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
993/// reth.toml config
994#[derive(Debug)]
995pub struct WithConfigs<ChainSpec> {
996    /// The configured, usually derived from the CLI.
997    pub config: NodeConfig<ChainSpec>,
998    /// The loaded reth.toml config.
999    pub toml_config: reth_config::Config,
1000}
1001
1002impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1003    fn clone(&self) -> Self {
1004        Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1005    }
1006}
1007
1008/// Helper container type to bundle the [`ProviderFactory`] and the metrics
1009/// sender.
1010#[derive(Debug, Clone)]
1011pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1012    provider_factory: ProviderFactory<N>,
1013    metrics_sender: UnboundedSender<MetricEvent>,
1014}
1015
1016/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1017/// and a metrics sender.
1018#[allow(missing_debug_implementations)]
1019pub struct WithMeteredProviders<T>
1020where
1021    T: FullNodeTypes,
1022{
1023    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1024    blockchain_db: T::Provider,
1025}
1026
1027/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1028#[allow(missing_debug_implementations)]
1029pub struct WithComponents<T, CB>
1030where
1031    T: FullNodeTypes,
1032    CB: NodeComponentsBuilder<T>,
1033{
1034    db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1035    node_adapter: NodeAdapter<T, CB::Components>,
1036    head: Head,
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use super::{LaunchContext, NodeConfig};
1042    use reth_config::Config;
1043    use reth_node_core::args::PruningArgs;
1044
1045    const EXTENSION: &str = "toml";
1046
1047    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1048        let temp_dir = tempfile::tempdir().unwrap();
1049        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1050        proc(&config_path);
1051        temp_dir.close().unwrap()
1052    }
1053
1054    #[test]
1055    fn test_save_prune_config() {
1056        with_tempdir("prune-store-test", |config_path| {
1057            let mut reth_config = Config::default();
1058            let node_config = NodeConfig {
1059                pruning: PruningArgs {
1060                    full: true,
1061                    block_interval: None,
1062                    sender_recovery_full: false,
1063                    sender_recovery_distance: None,
1064                    sender_recovery_before: None,
1065                    transaction_lookup_full: false,
1066                    transaction_lookup_distance: None,
1067                    transaction_lookup_before: None,
1068                    receipts_full: false,
1069                    receipts_distance: None,
1070                    receipts_before: None,
1071                    account_history_full: false,
1072                    account_history_distance: None,
1073                    account_history_before: None,
1074                    storage_history_full: false,
1075                    storage_history_distance: None,
1076                    storage_history_before: None,
1077                    receipts_log_filter: vec![],
1078                },
1079                ..NodeConfig::test()
1080            };
1081            LaunchContext::save_pruning_config_if_full_node(
1082                &mut reth_config,
1083                &node_config,
1084                config_path,
1085            )
1086            .unwrap();
1087
1088            let loaded_config = Config::from_path(config_path).unwrap();
1089
1090            assert_eq!(reth_config, loaded_config);
1091        })
1092    }
1093}