1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
69 BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70 StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory,
71};
72use reth_prune::{PruneModes, PrunerBuilder};
73use reth_rpc_builder::config::RethRpcServerConfig;
74use reth_rpc_layer::JwtSecret;
75use reth_stages::{
76 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
77 StageId,
78};
79use reth_static_file::StaticFileProducer;
80use reth_tasks::TaskExecutor;
81use reth_tracing::tracing::{debug, error, info, warn};
82use reth_transaction_pool::TransactionPool;
83use std::{sync::Arc, thread::available_parallelism};
84use tokio::sync::{
85 mpsc::{unbounded_channel, UnboundedSender},
86 oneshot, watch,
87};
88
89use futures::{future::Either, stream, Stream, StreamExt};
90use reth_node_ethstats::EthStatsService;
91use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
92
93#[derive(Debug, Clone)]
112pub struct LaunchContext {
113 pub task_executor: TaskExecutor,
115 pub data_dir: ChainPath<DataDirPath>,
117}
118
119impl LaunchContext {
120 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
122 Self { task_executor, data_dir }
123 }
124
125 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
127 LaunchContextWith { inner: self, attachment }
128 }
129
130 pub fn with_loaded_toml_config<ChainSpec>(
135 self,
136 config: NodeConfig<ChainSpec>,
137 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
138 where
139 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
140 {
141 let toml_config = self.load_toml_config(&config)?;
142 Ok(self.with(WithConfigs { config, toml_config }))
143 }
144
145 pub fn load_toml_config<ChainSpec>(
150 &self,
151 config: &NodeConfig<ChainSpec>,
152 ) -> eyre::Result<reth_config::Config>
153 where
154 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
155 {
156 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
157
158 let mut toml_config = reth_config::Config::from_path(&config_path)
159 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
160
161 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
162
163 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
164
165 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
167
168 toml_config.static_files = config.static_files.merge_with_config(toml_config.static_files);
170
171 Ok(toml_config)
172 }
173
174 fn save_pruning_config<ChainSpec>(
177 reth_config: &mut reth_config::Config,
178 config: &NodeConfig<ChainSpec>,
179 config_path: impl AsRef<std::path::Path>,
180 ) -> eyre::Result<()>
181 where
182 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
183 {
184 if let Some(prune_config) = config.prune_config() {
185 if reth_config.prune != prune_config {
186 reth_config.set_prune_config(prune_config);
187 info!(target: "reth::cli", "Saving prune config to toml file");
188 reth_config.save(config_path.as_ref())?;
189 }
190 } else if !reth_config.prune.is_default() {
191 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
192 }
193 Ok(())
194 }
195
196 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
198 self.configure_globals(reserved_cpu_cores);
199 self
200 }
201
202 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
209 match fdlimit::raise_fd_limit() {
212 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
213 debug!(from, to, "Raised file descriptor limit");
214 }
215 Ok(fdlimit::Outcome::Unsupported) => {}
216 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
217 }
218
219 let num_threads = available_parallelism()
223 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
224 if let Err(err) = ThreadPoolBuilder::new()
225 .num_threads(num_threads)
226 .thread_name(|i| format!("reth-rayon-{i}"))
227 .build_global()
228 {
229 warn!(%err, "Failed to build global thread pool")
230 }
231 }
232}
233
234#[derive(Debug, Clone)]
245pub struct LaunchContextWith<T> {
246 pub inner: LaunchContext,
248 pub attachment: T,
250}
251
252impl<T> LaunchContextWith<T> {
253 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
258 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
259 }
260
261 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
263 &self.inner.data_dir
264 }
265
266 pub const fn task_executor(&self) -> &TaskExecutor {
268 &self.inner.task_executor
269 }
270
271 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
273 LaunchContextWith {
274 inner: self.inner,
275 attachment: Attached::new(self.attachment, attachment),
276 }
277 }
278
279 pub fn inspect<F>(self, f: F) -> Self
282 where
283 F: FnOnce(&Self),
284 {
285 f(&self);
286 self
287 }
288}
289
290impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
291 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
293 if !self.attachment.config.network.trusted_peers.is_empty() {
294 info!(target: "reth::cli", "Adding trusted nodes");
295
296 self.attachment
297 .toml_config
298 .peers
299 .trusted_nodes
300 .extend(self.attachment.config.network.trusted_peers.clone());
301 }
302 Ok(self)
303 }
304}
305
306impl<L, R> LaunchContextWith<Attached<L, R>> {
307 pub const fn left(&self) -> &L {
309 &self.attachment.left
310 }
311
312 pub const fn right(&self) -> &R {
314 &self.attachment.right
315 }
316
317 pub const fn left_mut(&mut self) -> &mut L {
319 &mut self.attachment.left
320 }
321
322 pub const fn right_mut(&mut self) -> &mut R {
324 &mut self.attachment.right
325 }
326}
327impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
328 pub fn with_adjusted_configs(self) -> Self {
334 self.ensure_etl_datadir().with_adjusted_instance_ports()
335 }
336
337 pub fn ensure_etl_datadir(mut self) -> Self {
339 if self.toml_config_mut().stages.etl.dir.is_none() {
340 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
341 if etl_path.exists() {
342 if let Err(err) = fs::remove_dir_all(&etl_path) {
344 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
345 }
346 }
347 self.toml_config_mut().stages.etl.dir = Some(etl_path);
348 }
349
350 self
351 }
352
353 pub fn with_adjusted_instance_ports(mut self) -> Self {
355 self.node_config_mut().adjust_instance_ports();
356 self
357 }
358
359 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
361 self.attachment.left()
362 }
363
364 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
366 &self.left().config
367 }
368
369 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
371 &mut self.left_mut().config
372 }
373
374 pub const fn toml_config(&self) -> &reth_config::Config {
376 &self.left().toml_config
377 }
378
379 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
381 &mut self.left_mut().toml_config
382 }
383
384 pub fn chain_spec(&self) -> Arc<ChainSpec> {
386 self.node_config().chain.clone()
387 }
388
389 pub fn genesis_hash(&self) -> B256 {
391 self.node_config().chain.genesis_hash()
392 }
393
394 pub fn chain_id(&self) -> Chain {
396 self.node_config().chain.chain()
397 }
398
399 pub const fn is_dev(&self) -> bool {
401 self.node_config().dev.dev
402 }
403
404 pub fn prune_config(&self) -> PruneConfig
408 where
409 ChainSpec: reth_chainspec::EthereumHardforks,
410 {
411 let Some(mut node_prune_config) = self.node_config().prune_config() else {
412 return self.toml_config().prune.clone();
414 };
415
416 node_prune_config.merge(self.toml_config().prune.clone());
418 node_prune_config
419 }
420
421 pub fn prune_modes(&self) -> PruneModes
423 where
424 ChainSpec: reth_chainspec::EthereumHardforks,
425 {
426 self.prune_config().segments
427 }
428
429 pub fn pruner_builder(&self) -> PrunerBuilder
431 where
432 ChainSpec: reth_chainspec::EthereumHardforks,
433 {
434 PrunerBuilder::new(self.prune_config())
435 }
436
437 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
439 let default_jwt_path = self.data_dir().jwt();
440 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
441 Ok(secret)
442 }
443
444 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
446 where
447 Pool: TransactionPool + Unpin,
448 {
449 if let Some(interval) = self.node_config().dev.block_time {
450 MiningMode::interval(interval)
451 } else {
452 MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
453 }
454 }
455}
456
457impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
458where
459 DB: Database + Clone + 'static,
460 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
461{
462 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
466 where
467 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
468 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
469 {
470 let static_files_config = &self.toml_config().static_files;
472 static_files_config.validate()?;
473
474 let static_file_provider =
476 StaticFileProviderBuilder::read_write(self.data_dir().static_files())?
477 .with_metrics()
478 .with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
479 .build()?;
480
481 let factory =
482 ProviderFactory::new(self.right().clone(), self.chain_spec(), static_file_provider)?
483 .with_prune_modes(self.prune_modes());
484
485 if let Some(unwind_target) =
488 factory.static_file_provider().check_consistency(&factory.provider()?)?
489 {
490 assert_ne!(
493 unwind_target,
494 PipelineTarget::Unwind(0),
495 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
496 );
497
498 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
499
500 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
501
502 let pipeline = PipelineBuilder::default()
504 .add_stages(DefaultStages::new(
505 factory.clone(),
506 tip_rx,
507 Arc::new(NoopConsensus::default()),
508 NoopHeaderDownloader::default(),
509 NoopBodiesDownloader::default(),
510 NoopEvmConfig::<Evm>::default(),
511 self.toml_config().stages.clone(),
512 self.prune_modes(),
513 None,
514 ))
515 .build(
516 factory.clone(),
517 StaticFileProducer::new(factory.clone(), self.prune_modes()),
518 );
519
520 let (tx, rx) = oneshot::channel();
522
523 self.task_executor().spawn_critical_blocking(
525 "pipeline task",
526 Box::pin(async move {
527 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
528 let _ = tx.send(result);
529 }),
530 );
531 rx.await?.inspect_err(|err| {
532 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
533 })?;
534 }
535
536 Ok(factory)
537 }
538
539 pub async fn with_provider_factory<N, Evm>(
541 self,
542 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
543 where
544 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
545 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
546 {
547 let factory = self.create_provider_factory::<N, Evm>().await?;
548 let ctx = LaunchContextWith {
549 inner: self.inner,
550 attachment: self.attachment.map_right(|_| factory),
551 };
552
553 Ok(ctx)
554 }
555}
556
557impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
558where
559 T: ProviderNodeTypes,
560{
561 pub const fn database(&self) -> &T::DB {
563 self.right().db_ref()
564 }
565
566 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
568 self.right()
569 }
570
571 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
573 self.right().static_file_provider()
574 }
575
576 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
580 self.start_prometheus_endpoint().await?;
581 Ok(self)
582 }
583
584 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
586 install_prometheus_recorder().spawn_upkeep();
588
589 let listen_addr = self.node_config().metrics.prometheus;
590 if let Some(addr) = listen_addr {
591 let config = MetricServerConfig::new(
592 addr,
593 VersionInfo {
594 version: version_metadata().cargo_pkg_version.as_ref(),
595 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
596 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
597 git_sha: version_metadata().vergen_git_sha.as_ref(),
598 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
599 build_profile: version_metadata().build_profile_name.as_ref(),
600 },
601 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
602 self.task_executor().clone(),
603 Hooks::builder()
604 .with_hook({
605 let db = self.database().clone();
606 move || db.report_metrics()
607 })
608 .with_hook({
609 let sfp = self.static_file_provider();
610 move || {
611 if let Err(error) = sfp.report_metrics() {
612 error!(%error, "Failed to report metrics for the static file provider");
613 }
614 }
615 })
616 .build(),
617 ).with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
618
619 MetricServer::new(config).serve().await?;
620 }
621
622 Ok(())
623 }
624
625 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
627 init_genesis_with_settings(
628 self.provider_factory(),
629 self.node_config().static_files.to_settings(),
630 )?;
631 Ok(self)
632 }
633
634 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
636 init_genesis_with_settings(
637 self.provider_factory(),
638 self.node_config().static_files.to_settings(),
639 )
640 }
641
642 pub fn with_metrics_task(
648 self,
649 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
650 let (metrics_sender, metrics_receiver) = unbounded_channel();
651
652 let with_metrics =
653 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
654
655 debug!(target: "reth::cli", "Spawning stages metrics listener task");
656 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
657 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
658
659 LaunchContextWith {
660 inner: self.inner,
661 attachment: self.attachment.map_right(|_| with_metrics),
662 }
663 }
664}
665
666impl<N, DB>
667 LaunchContextWith<
668 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
669 >
670where
671 N: NodeTypes,
672 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
673{
674 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
676 &self.right().provider_factory
677 }
678
679 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
681 self.right().metrics_sender.clone()
682 }
683
684 #[expect(clippy::complexity)]
686 pub fn with_blockchain_db<T, F>(
687 self,
688 create_blockchain_provider: F,
689 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
690 where
691 T: FullNodeTypes<Types = N, DB = DB>,
692 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
693 {
694 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
695
696 let metered_providers = WithMeteredProviders {
697 db_provider_container: WithMeteredProvider {
698 provider_factory: self.provider_factory().clone(),
699 metrics_sender: self.sync_metrics_tx(),
700 },
701 blockchain_db,
702 };
703
704 let ctx = LaunchContextWith {
705 inner: self.inner,
706 attachment: self.attachment.map_right(|_| metered_providers),
707 };
708
709 Ok(ctx)
710 }
711}
712
713impl<T>
714 LaunchContextWith<
715 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
716 >
717where
718 T: FullNodeTypes<Types: NodeTypesForProvider>,
719{
720 pub const fn database(&self) -> &T::DB {
722 self.provider_factory().db_ref()
723 }
724
725 pub const fn provider_factory(
727 &self,
728 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
729 &self.right().db_provider_container.provider_factory
730 }
731
732 pub fn lookup_head(&self) -> eyre::Result<Head> {
736 self.node_config()
737 .lookup_head(self.provider_factory())
738 .wrap_err("the head block is missing")
739 }
740
741 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
743 self.right().db_provider_container.metrics_sender.clone()
744 }
745
746 pub const fn blockchain_db(&self) -> &T::Provider {
748 &self.right().blockchain_db
749 }
750
751 pub async fn with_components<CB>(
753 self,
754 components_builder: CB,
755 on_component_initialized: Box<
756 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
757 >,
758 ) -> eyre::Result<
759 LaunchContextWith<
760 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
761 >,
762 >
763 where
764 CB: NodeComponentsBuilder<T>,
765 {
766 let head = self.lookup_head()?;
768
769 let builder_ctx = BuilderContext::new(
770 head,
771 self.blockchain_db().clone(),
772 self.task_executor().clone(),
773 self.configs().clone(),
774 );
775
776 debug!(target: "reth::cli", "creating components");
777 let components = components_builder.build_components(&builder_ctx).await?;
778
779 let blockchain_db = self.blockchain_db().clone();
780
781 let node_adapter = NodeAdapter {
782 components,
783 task_executor: self.task_executor().clone(),
784 provider: blockchain_db,
785 };
786
787 debug!(target: "reth::cli", "calling on_component_initialized hook");
788 on_component_initialized.on_event(node_adapter.clone())?;
789
790 let components_container = WithComponents {
791 db_provider_container: WithMeteredProvider {
792 provider_factory: self.provider_factory().clone(),
793 metrics_sender: self.sync_metrics_tx(),
794 },
795 node_adapter,
796 head,
797 };
798
799 let ctx = LaunchContextWith {
800 inner: self.inner,
801 attachment: self.attachment.map_right(|_| components_container),
802 };
803
804 Ok(ctx)
805 }
806}
807
808impl<T, CB>
809 LaunchContextWith<
810 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
811 >
812where
813 T: FullNodeTypes<Types: NodeTypesForProvider>,
814 CB: NodeComponentsBuilder<T>,
815{
816 pub const fn provider_factory(
818 &self,
819 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
820 &self.right().db_provider_container.provider_factory
821 }
822
823 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
826 where
827 C: HeadersClient<Header: BlockHeader>,
828 {
829 self.node_config().max_block(client, self.provider_factory().clone()).await
830 }
831
832 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
834 self.provider_factory().static_file_provider()
835 }
836
837 pub fn static_file_producer(
839 &self,
840 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
841 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
842 }
843
844 pub const fn head(&self) -> Head {
846 self.right().head
847 }
848
849 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
851 &self.right().node_adapter
852 }
853
854 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
856 &mut self.right_mut().node_adapter
857 }
858
859 pub const fn blockchain_db(&self) -> &T::Provider {
861 &self.node_adapter().provider
862 }
863
864 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
870 let mut initial_target = self.node_config().debug.tip;
871
872 if initial_target.is_none() {
873 initial_target = self.check_pipeline_consistency()?;
874 }
875
876 Ok(initial_target)
877 }
878
879 pub const fn terminate_after_initial_backfill(&self) -> bool {
885 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
886 }
887
888 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
893 if self.chain_spec().is_optimism() &&
894 !self.is_dev() &&
895 self.chain_id() == Chain::optimism_mainnet()
896 {
897 let latest = self.blockchain_db().last_block_number()?;
898 if latest < 105235063 {
900 error!(
901 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
902 );
903 return Err(ProviderError::BestBlockNotFound)
904 }
905 }
906
907 Ok(())
908 }
909
910 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
922 let first_stage_checkpoint = self
925 .blockchain_db()
926 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
927 .unwrap_or_default()
928 .block_number;
929
930 for stage_id in StageId::ALL.iter().skip(1) {
933 let stage_checkpoint = self
934 .blockchain_db()
935 .get_stage_checkpoint(*stage_id)?
936 .unwrap_or_default()
937 .block_number;
938
939 if stage_checkpoint < first_stage_checkpoint {
942 debug!(
943 target: "consensus::engine",
944 first_stage_checkpoint,
945 inconsistent_stage_id = %stage_id,
946 inconsistent_stage_checkpoint = stage_checkpoint,
947 "Pipeline sync progress is inconsistent"
948 );
949 return self.blockchain_db().block_hash(first_stage_checkpoint);
950 }
951 }
952
953 self.ensure_chain_specific_db_checks()?;
954
955 Ok(None)
956 }
957
958 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
960 self.right().db_provider_container.metrics_sender.clone()
961 }
962
963 pub const fn components(&self) -> &CB::Components {
965 &self.node_adapter().components
966 }
967
968 #[allow(clippy::type_complexity)]
970 pub async fn launch_exex(
971 &self,
972 installed_exex: Vec<(
973 String,
974 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
975 )>,
976 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
977 ExExLauncher::new(
978 self.head(),
979 self.node_adapter().clone(),
980 installed_exex,
981 self.configs().clone(),
982 )
983 .launch()
984 .await
985 }
986
987 pub fn era_import_source(&self) -> Option<EraImportSource> {
991 let node_config = self.node_config();
992 if !node_config.era.enabled {
993 return None;
994 }
995
996 EraImportSource::maybe_new(
997 node_config.era.source.path.clone(),
998 node_config.era.source.url.clone(),
999 || node_config.chain.chain().kind().default_era_host(),
1000 || node_config.datadir().data_dir().join("era").into(),
1001 )
1002 }
1003
1004 pub fn consensus_layer_events(
1012 &self,
1013 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1014 where
1015 T::Provider: reth_provider::CanonChainTracker,
1016 {
1017 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1018 Either::Left(
1019 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1020 .map(Into::into),
1021 )
1022 } else {
1023 Either::Right(stream::empty())
1024 }
1025 }
1026
1027 pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1029 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1030
1031 let network = self.components().network().clone();
1032 let pool = self.components().pool().clone();
1033 let provider = self.node_adapter().provider.clone();
1034
1035 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1036
1037 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1038 tokio::spawn(async move { ethstats.run().await });
1039
1040 Ok(())
1041 }
1042}
1043
1044#[derive(Clone, Copy, Debug)]
1050pub struct Attached<L, R> {
1051 left: L,
1052 right: R,
1053}
1054
1055impl<L, R> Attached<L, R> {
1056 pub const fn new(left: L, right: R) -> Self {
1058 Self { left, right }
1059 }
1060
1061 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1063 where
1064 F: FnOnce(L) -> T,
1065 {
1066 Attached::new(f(self.left), self.right)
1067 }
1068
1069 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1071 where
1072 F: FnOnce(R) -> T,
1073 {
1074 Attached::new(self.left, f(self.right))
1075 }
1076
1077 pub const fn left(&self) -> &L {
1079 &self.left
1080 }
1081
1082 pub const fn right(&self) -> &R {
1084 &self.right
1085 }
1086
1087 pub const fn left_mut(&mut self) -> &mut L {
1089 &mut self.left
1090 }
1091
1092 pub const fn right_mut(&mut self) -> &mut R {
1094 &mut self.right
1095 }
1096}
1097
1098#[derive(Debug)]
1101pub struct WithConfigs<ChainSpec> {
1102 pub config: NodeConfig<ChainSpec>,
1104 pub toml_config: reth_config::Config,
1106}
1107
1108impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1109 fn clone(&self) -> Self {
1110 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1111 }
1112}
1113
1114#[derive(Debug, Clone)]
1117pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1118 provider_factory: ProviderFactory<N>,
1119 metrics_sender: UnboundedSender<MetricEvent>,
1120}
1121
1122#[expect(missing_debug_implementations)]
1125pub struct WithMeteredProviders<T>
1126where
1127 T: FullNodeTypes,
1128{
1129 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1130 blockchain_db: T::Provider,
1131}
1132
1133#[expect(missing_debug_implementations)]
1135pub struct WithComponents<T, CB>
1136where
1137 T: FullNodeTypes,
1138 CB: NodeComponentsBuilder<T>,
1139{
1140 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1141 node_adapter: NodeAdapter<T, CB::Components>,
1142 head: Head,
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::{LaunchContext, NodeConfig};
1148 use reth_config::Config;
1149 use reth_node_core::args::PruningArgs;
1150
1151 const EXTENSION: &str = "toml";
1152
1153 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1154 let temp_dir = tempfile::tempdir().unwrap();
1155 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1156 proc(&config_path);
1157 temp_dir.close().unwrap()
1158 }
1159
1160 #[test]
1161 fn test_save_prune_config() {
1162 with_tempdir("prune-store-test", |config_path| {
1163 let mut reth_config = Config::default();
1164 let node_config = NodeConfig {
1165 pruning: PruningArgs {
1166 full: true,
1167 block_interval: None,
1168 sender_recovery_full: false,
1169 sender_recovery_distance: None,
1170 sender_recovery_before: None,
1171 transaction_lookup_full: false,
1172 transaction_lookup_distance: None,
1173 transaction_lookup_before: None,
1174 receipts_full: false,
1175 receipts_pre_merge: false,
1176 receipts_distance: None,
1177 receipts_before: None,
1178 account_history_full: false,
1179 account_history_distance: None,
1180 account_history_before: None,
1181 storage_history_full: false,
1182 storage_history_distance: None,
1183 storage_history_before: None,
1184 bodies_pre_merge: false,
1185 bodies_distance: None,
1186 receipts_log_filter: None,
1187 bodies_before: None,
1188 },
1189 ..NodeConfig::test()
1190 };
1191 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1192 .unwrap();
1193
1194 let loaded_config = Config::from_path(config_path).unwrap();
1195
1196 assert_eq!(reth_config, loaded_config);
1197 })
1198 }
1199}