1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_consensus::BlockHeader as _;
38use alloy_eips::eip2124::Head;
39use alloy_primitives::{BlockNumber, B256};
40use eyre::Context;
41use rayon::ThreadPoolBuilder;
42use reth_chainspec::{Chain, EthChainSpec, EthereumHardfork, EthereumHardforks};
43use reth_config::{config::EtlConfig, PruneConfig};
44use reth_consensus::noop::NoopConsensus;
45use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
46use reth_db_common::init::{init_genesis, InitStorageError};
47use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
48use reth_engine_local::MiningMode;
49use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
50use reth_exex::ExExManagerHandle;
51use reth_fs_util as fs;
52use reth_network_p2p::headers::client::HeadersClient;
53use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
54use reth_node_core::{
55 args::DefaultEraHost,
56 dirs::{ChainPath, DataDirPath},
57 node_config::NodeConfig,
58 primitives::BlockHeader,
59 version::version_metadata,
60};
61use reth_node_metrics::{
62 chain::ChainSpecInfo,
63 hooks::Hooks,
64 recorder::install_prometheus_recorder,
65 server::{MetricServer, MetricServerConfig},
66 version::VersionInfo,
67};
68use reth_provider::{
69 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
70 BlockHashReader, BlockNumReader, BlockReaderIdExt, ProviderError, ProviderFactory,
71 ProviderResult, StageCheckpointReader, StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78 StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::tracing::{debug, error, info, warn};
83use reth_transaction_pool::TransactionPool;
84use std::{sync::Arc, thread::available_parallelism};
85use tokio::sync::{
86 mpsc::{unbounded_channel, UnboundedSender},
87 oneshot, watch,
88};
89
90use futures::{future::Either, stream, Stream, StreamExt};
91use reth_node_ethstats::EthStatsService;
92use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
93
94#[derive(Debug, Clone)]
113pub struct LaunchContext {
114 pub task_executor: TaskExecutor,
116 pub data_dir: ChainPath<DataDirPath>,
118}
119
120impl LaunchContext {
121 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
123 Self { task_executor, data_dir }
124 }
125
126 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
128 LaunchContextWith { inner: self, attachment }
129 }
130
131 pub fn with_loaded_toml_config<ChainSpec>(
136 self,
137 config: NodeConfig<ChainSpec>,
138 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
139 where
140 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
141 {
142 let toml_config = self.load_toml_config(&config)?;
143 Ok(self.with(WithConfigs { config, toml_config }))
144 }
145
146 pub fn load_toml_config<ChainSpec>(
151 &self,
152 config: &NodeConfig<ChainSpec>,
153 ) -> eyre::Result<reth_config::Config>
154 where
155 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
156 {
157 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
158
159 let mut toml_config = reth_config::Config::from_path(&config_path)
160 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
161
162 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
163
164 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
165
166 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
168
169 Ok(toml_config)
170 }
171
172 fn save_pruning_config_if_full_node<ChainSpec>(
174 reth_config: &mut reth_config::Config,
175 config: &NodeConfig<ChainSpec>,
176 config_path: impl AsRef<std::path::Path>,
177 ) -> eyre::Result<()>
178 where
179 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
180 {
181 if reth_config.prune.is_none() {
182 if let Some(prune_config) = config.prune_config() {
183 reth_config.update_prune_config(prune_config);
184 info!(target: "reth::cli", "Saving prune config to toml file");
185 reth_config.save(config_path.as_ref())?;
186 }
187 } else if config.prune_config().is_none() {
188 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
189 }
190 Ok(())
191 }
192
193 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
195 self.configure_globals(reserved_cpu_cores);
196 self
197 }
198
199 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
206 match fdlimit::raise_fd_limit() {
209 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
210 debug!(from, to, "Raised file descriptor limit");
211 }
212 Ok(fdlimit::Outcome::Unsupported) => {}
213 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
214 }
215
216 let num_threads = available_parallelism()
220 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
221 if let Err(err) = ThreadPoolBuilder::new()
222 .num_threads(num_threads)
223 .thread_name(|i| format!("reth-rayon-{i}"))
224 .build_global()
225 {
226 warn!(%err, "Failed to build global thread pool")
227 }
228 }
229}
230
231#[derive(Debug, Clone)]
242pub struct LaunchContextWith<T> {
243 pub inner: LaunchContext,
245 pub attachment: T,
247}
248
249impl<T> LaunchContextWith<T> {
250 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
255 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
256 }
257
258 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
260 &self.inner.data_dir
261 }
262
263 pub const fn task_executor(&self) -> &TaskExecutor {
265 &self.inner.task_executor
266 }
267
268 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
270 LaunchContextWith {
271 inner: self.inner,
272 attachment: Attached::new(self.attachment, attachment),
273 }
274 }
275
276 pub fn inspect<F>(self, f: F) -> Self
279 where
280 F: FnOnce(&Self),
281 {
282 f(&self);
283 self
284 }
285}
286
287impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
288 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
290 if !self.attachment.config.network.trusted_peers.is_empty() {
291 info!(target: "reth::cli", "Adding trusted nodes");
292
293 self.attachment
294 .toml_config
295 .peers
296 .trusted_nodes
297 .extend(self.attachment.config.network.trusted_peers.clone());
298 }
299 Ok(self)
300 }
301}
302
303impl<L, R> LaunchContextWith<Attached<L, R>> {
304 pub const fn left(&self) -> &L {
306 &self.attachment.left
307 }
308
309 pub const fn right(&self) -> &R {
311 &self.attachment.right
312 }
313
314 pub const fn left_mut(&mut self) -> &mut L {
316 &mut self.attachment.left
317 }
318
319 pub const fn right_mut(&mut self) -> &mut R {
321 &mut self.attachment.right
322 }
323}
324impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
325 pub fn with_adjusted_configs(self) -> Self {
331 self.ensure_etl_datadir().with_adjusted_instance_ports()
332 }
333
334 pub fn ensure_etl_datadir(mut self) -> Self {
336 if self.toml_config_mut().stages.etl.dir.is_none() {
337 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
338 if etl_path.exists() {
339 if let Err(err) = fs::remove_dir_all(&etl_path) {
341 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
342 }
343 }
344 self.toml_config_mut().stages.etl.dir = Some(etl_path);
345 }
346
347 self
348 }
349
350 pub fn with_adjusted_instance_ports(mut self) -> Self {
352 self.node_config_mut().adjust_instance_ports();
353 self
354 }
355
356 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
358 self.attachment.left()
359 }
360
361 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
363 &self.left().config
364 }
365
366 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
368 &mut self.left_mut().config
369 }
370
371 pub const fn toml_config(&self) -> &reth_config::Config {
373 &self.left().toml_config
374 }
375
376 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
378 &mut self.left_mut().toml_config
379 }
380
381 pub fn chain_spec(&self) -> Arc<ChainSpec> {
383 self.node_config().chain.clone()
384 }
385
386 pub fn genesis_hash(&self) -> B256 {
388 self.node_config().chain.genesis_hash()
389 }
390
391 pub fn chain_id(&self) -> Chain {
393 self.node_config().chain.chain()
394 }
395
396 pub const fn is_dev(&self) -> bool {
398 self.node_config().dev.dev
399 }
400
401 pub fn prune_config(&self) -> Option<PruneConfig>
405 where
406 ChainSpec: reth_chainspec::EthereumHardforks,
407 {
408 let Some(mut node_prune_config) = self.node_config().prune_config() else {
409 return self.toml_config().prune.clone();
411 };
412
413 node_prune_config.merge(self.toml_config().prune.clone());
415 Some(node_prune_config)
416 }
417
418 pub fn prune_modes(&self) -> PruneModes
420 where
421 ChainSpec: reth_chainspec::EthereumHardforks,
422 {
423 self.prune_config().map(|config| config.segments).unwrap_or_default()
424 }
425
426 pub fn pruner_builder(&self) -> PrunerBuilder
428 where
429 ChainSpec: reth_chainspec::EthereumHardforks,
430 {
431 PrunerBuilder::new(self.prune_config().unwrap_or_default())
432 .delete_limit(self.chain_spec().prune_delete_limit())
433 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
434 }
435
436 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
438 let default_jwt_path = self.data_dir().jwt();
439 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
440 Ok(secret)
441 }
442
443 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
445 where
446 Pool: TransactionPool + Unpin,
447 {
448 if let Some(interval) = self.node_config().dev.block_time {
449 MiningMode::interval(interval)
450 } else {
451 MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
452 }
453 }
454}
455
456impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
457where
458 DB: Database + Clone + 'static,
459 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
460{
461 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
465 where
466 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
467 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
468 {
469 let factory = ProviderFactory::new(
470 self.right().clone(),
471 self.chain_spec(),
472 StaticFileProvider::read_write(self.data_dir().static_files())?,
473 )
474 .with_prune_modes(self.prune_modes())
475 .with_static_files_metrics();
476
477 let has_receipt_pruning =
478 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
479
480 if let Some(unwind_target) = factory
483 .static_file_provider()
484 .check_consistency(&factory.provider()?, has_receipt_pruning)?
485 {
486 assert_ne!(
489 unwind_target,
490 PipelineTarget::Unwind(0),
491 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
492 );
493
494 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
495
496 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
497
498 let pipeline = PipelineBuilder::default()
500 .add_stages(DefaultStages::new(
501 factory.clone(),
502 tip_rx,
503 Arc::new(NoopConsensus::default()),
504 NoopHeaderDownloader::default(),
505 NoopBodiesDownloader::default(),
506 NoopEvmConfig::<Evm>::default(),
507 self.toml_config().stages.clone(),
508 self.prune_modes(),
509 None,
510 ))
511 .build(
512 factory.clone(),
513 StaticFileProducer::new(factory.clone(), self.prune_modes()),
514 );
515
516 let (tx, rx) = oneshot::channel();
518
519 self.task_executor().spawn_critical_blocking(
521 "pipeline task",
522 Box::pin(async move {
523 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
524 let _ = tx.send(result);
525 }),
526 );
527 rx.await?.inspect_err(|err| {
528 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
529 })?;
530 }
531
532 Ok(factory)
533 }
534
535 pub async fn with_provider_factory<N, Evm>(
537 self,
538 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
539 where
540 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
541 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
542 {
543 let factory = self.create_provider_factory::<N, Evm>().await?;
544 let ctx = LaunchContextWith {
545 inner: self.inner,
546 attachment: self.attachment.map_right(|_| factory),
547 };
548
549 Ok(ctx)
550 }
551}
552
553impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
554where
555 T: ProviderNodeTypes,
556{
557 pub const fn database(&self) -> &T::DB {
559 self.right().db_ref()
560 }
561
562 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
564 self.right()
565 }
566
567 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
569 self.right().static_file_provider()
570 }
571
572 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
576 self.start_prometheus_endpoint().await?;
577 Ok(self)
578 }
579
580 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
582 install_prometheus_recorder().spawn_upkeep();
584
585 let listen_addr = self.node_config().metrics;
586 if let Some(addr) = listen_addr {
587 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
588 let config = MetricServerConfig::new(
589 addr,
590 VersionInfo {
591 version: version_metadata().cargo_pkg_version.as_ref(),
592 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
593 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
594 git_sha: version_metadata().vergen_git_sha.as_ref(),
595 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
596 build_profile: version_metadata().build_profile_name.as_ref(),
597 },
598 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
599 self.task_executor().clone(),
600 Hooks::builder()
601 .with_hook({
602 let db = self.database().clone();
603 move || db.report_metrics()
604 })
605 .with_hook({
606 let sfp = self.static_file_provider();
607 move || {
608 if let Err(error) = sfp.report_metrics() {
609 error!(%error, "Failed to report metrics for the static file provider");
610 }
611 }
612 })
613 .build(),
614 );
615
616 MetricServer::new(config).serve().await?;
617 }
618
619 Ok(())
620 }
621
622 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
624 init_genesis(self.provider_factory())?;
625 Ok(self)
626 }
627
628 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
630 init_genesis(self.provider_factory())
631 }
632
633 pub fn with_metrics_task(
639 self,
640 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
641 let (metrics_sender, metrics_receiver) = unbounded_channel();
642
643 let with_metrics =
644 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
645
646 debug!(target: "reth::cli", "Spawning stages metrics listener task");
647 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
648 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
649
650 LaunchContextWith {
651 inner: self.inner,
652 attachment: self.attachment.map_right(|_| with_metrics),
653 }
654 }
655}
656
657impl<N, DB>
658 LaunchContextWith<
659 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
660 >
661where
662 N: NodeTypes,
663 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
664{
665 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
667 &self.right().provider_factory
668 }
669
670 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
672 self.right().metrics_sender.clone()
673 }
674
675 #[expect(clippy::complexity)]
677 pub fn with_blockchain_db<T, F>(
678 self,
679 create_blockchain_provider: F,
680 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
681 where
682 T: FullNodeTypes<Types = N, DB = DB>,
683 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
684 {
685 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
686
687 let metered_providers = WithMeteredProviders {
688 db_provider_container: WithMeteredProvider {
689 provider_factory: self.provider_factory().clone(),
690 metrics_sender: self.sync_metrics_tx(),
691 },
692 blockchain_db,
693 };
694
695 let ctx = LaunchContextWith {
696 inner: self.inner,
697 attachment: self.attachment.map_right(|_| metered_providers),
698 };
699
700 Ok(ctx)
701 }
702}
703
704impl<T>
705 LaunchContextWith<
706 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
707 >
708where
709 T: FullNodeTypes<Types: NodeTypesForProvider>,
710{
711 pub const fn database(&self) -> &T::DB {
713 self.provider_factory().db_ref()
714 }
715
716 pub const fn provider_factory(
718 &self,
719 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
720 &self.right().db_provider_container.provider_factory
721 }
722
723 pub fn lookup_head(&self) -> eyre::Result<Head> {
727 self.node_config()
728 .lookup_head(self.provider_factory())
729 .wrap_err("the head block is missing")
730 }
731
732 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
734 self.right().db_provider_container.metrics_sender.clone()
735 }
736
737 pub const fn blockchain_db(&self) -> &T::Provider {
739 &self.right().blockchain_db
740 }
741
742 pub async fn with_components<CB>(
744 self,
745 components_builder: CB,
746 on_component_initialized: Box<
747 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
748 >,
749 ) -> eyre::Result<
750 LaunchContextWith<
751 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
752 >,
753 >
754 where
755 CB: NodeComponentsBuilder<T>,
756 {
757 let head = self.lookup_head()?;
759
760 let builder_ctx = BuilderContext::new(
761 head,
762 self.blockchain_db().clone(),
763 self.task_executor().clone(),
764 self.configs().clone(),
765 );
766
767 debug!(target: "reth::cli", "creating components");
768 let components = components_builder.build_components(&builder_ctx).await?;
769
770 let blockchain_db = self.blockchain_db().clone();
771
772 let node_adapter = NodeAdapter {
773 components,
774 task_executor: self.task_executor().clone(),
775 provider: blockchain_db,
776 };
777
778 debug!(target: "reth::cli", "calling on_component_initialized hook");
779 on_component_initialized.on_event(node_adapter.clone())?;
780
781 let components_container = WithComponents {
782 db_provider_container: WithMeteredProvider {
783 provider_factory: self.provider_factory().clone(),
784 metrics_sender: self.sync_metrics_tx(),
785 },
786 node_adapter,
787 head,
788 };
789
790 let ctx = LaunchContextWith {
791 inner: self.inner,
792 attachment: self.attachment.map_right(|_| components_container),
793 };
794
795 Ok(ctx)
796 }
797}
798
799impl<T, CB>
800 LaunchContextWith<
801 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
802 >
803where
804 T: FullNodeTypes<Types: NodeTypesForProvider>,
805 CB: NodeComponentsBuilder<T>,
806{
807 pub const fn provider_factory(
809 &self,
810 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
811 &self.right().db_provider_container.provider_factory
812 }
813
814 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
817 where
818 C: HeadersClient<Header: BlockHeader>,
819 {
820 self.node_config().max_block(client, self.provider_factory().clone()).await
821 }
822
823 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
825 self.provider_factory().static_file_provider()
826 }
827
828 pub fn static_file_producer(
830 &self,
831 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
832 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
833 }
834
835 pub const fn head(&self) -> Head {
837 self.right().head
838 }
839
840 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
842 &self.right().node_adapter
843 }
844
845 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
847 &mut self.right_mut().node_adapter
848 }
849
850 pub const fn blockchain_db(&self) -> &T::Provider {
852 &self.node_adapter().provider
853 }
854
855 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
861 let mut initial_target = self.node_config().debug.tip;
862
863 if initial_target.is_none() {
864 initial_target = self.check_pipeline_consistency()?;
865 }
866
867 Ok(initial_target)
868 }
869
870 pub const fn terminate_after_initial_backfill(&self) -> bool {
876 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
877 }
878
879 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
884 if self.chain_spec().is_optimism() &&
885 !self.is_dev() &&
886 self.chain_id() == Chain::optimism_mainnet()
887 {
888 let latest = self.blockchain_db().last_block_number()?;
889 if latest < 105235063 {
891 error!(
892 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
893 );
894 return Err(ProviderError::BestBlockNotFound)
895 }
896 }
897
898 Ok(())
899 }
900
901 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
913 let first_stage_checkpoint = self
916 .blockchain_db()
917 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
918 .unwrap_or_default()
919 .block_number;
920
921 for stage_id in StageId::ALL.iter().skip(1) {
924 let stage_checkpoint = self
925 .blockchain_db()
926 .get_stage_checkpoint(*stage_id)?
927 .unwrap_or_default()
928 .block_number;
929
930 if stage_checkpoint < first_stage_checkpoint {
933 debug!(
934 target: "consensus::engine",
935 first_stage_checkpoint,
936 inconsistent_stage_id = %stage_id,
937 inconsistent_stage_checkpoint = stage_checkpoint,
938 "Pipeline sync progress is inconsistent"
939 );
940 return self.blockchain_db().block_hash(first_stage_checkpoint);
941 }
942 }
943
944 self.ensure_chain_specific_db_checks()?;
945
946 Ok(None)
947 }
948
949 pub fn expire_pre_merge_transactions(&self) -> eyre::Result<()>
955 where
956 T: FullNodeTypes<Provider: StaticFileProviderFactory>,
957 {
958 if self.node_config().pruning.bodies_pre_merge &&
959 let Some(merge_block) = self
960 .chain_spec()
961 .ethereum_fork_activation(EthereumHardfork::Paris)
962 .block_number()
963 {
964 let Some(latest) = self.blockchain_db().latest_header()? else { return Ok(()) };
966 if latest.number() > merge_block {
967 let provider = self.blockchain_db().static_file_provider();
968 if provider
969 .get_lowest_transaction_static_file_block()
970 .is_some_and(|lowest| lowest < merge_block)
971 {
972 info!(target: "reth::cli", merge_block, "Expiring pre-merge transactions");
973 provider.delete_transactions_below(merge_block)?;
974 } else {
975 debug!(target: "reth::cli", merge_block, "No pre-merge transactions to expire");
976 }
977 }
978 }
979
980 Ok(())
981 }
982
983 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
985 self.right().db_provider_container.metrics_sender.clone()
986 }
987
988 pub const fn components(&self) -> &CB::Components {
990 &self.node_adapter().components
991 }
992
993 #[allow(clippy::type_complexity)]
995 pub async fn launch_exex(
996 &self,
997 installed_exex: Vec<(
998 String,
999 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1000 )>,
1001 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1002 ExExLauncher::new(
1003 self.head(),
1004 self.node_adapter().clone(),
1005 installed_exex,
1006 self.configs().clone(),
1007 )
1008 .launch()
1009 .await
1010 }
1011
1012 pub fn era_import_source(&self) -> Option<EraImportSource> {
1016 let node_config = self.node_config();
1017 if !node_config.era.enabled {
1018 return None;
1019 }
1020
1021 EraImportSource::maybe_new(
1022 node_config.era.source.path.clone(),
1023 node_config.era.source.url.clone(),
1024 || node_config.chain.chain().kind().default_era_host(),
1025 || node_config.datadir().data_dir().join("era").into(),
1026 )
1027 }
1028
1029 pub fn consensus_layer_events(
1037 &self,
1038 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1039 where
1040 T::Provider: reth_provider::CanonChainTracker,
1041 {
1042 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1043 Either::Left(
1044 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1045 .map(Into::into),
1046 )
1047 } else {
1048 Either::Right(stream::empty())
1049 }
1050 }
1051
1052 pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1054 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1055
1056 let network = self.components().network().clone();
1057 let pool = self.components().pool().clone();
1058 let provider = self.node_adapter().provider.clone();
1059
1060 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1061
1062 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1063 tokio::spawn(async move { ethstats.run().await });
1064
1065 Ok(())
1066 }
1067}
1068
1069#[derive(Clone, Copy, Debug)]
1075pub struct Attached<L, R> {
1076 left: L,
1077 right: R,
1078}
1079
1080impl<L, R> Attached<L, R> {
1081 pub const fn new(left: L, right: R) -> Self {
1083 Self { left, right }
1084 }
1085
1086 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1088 where
1089 F: FnOnce(L) -> T,
1090 {
1091 Attached::new(f(self.left), self.right)
1092 }
1093
1094 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1096 where
1097 F: FnOnce(R) -> T,
1098 {
1099 Attached::new(self.left, f(self.right))
1100 }
1101
1102 pub const fn left(&self) -> &L {
1104 &self.left
1105 }
1106
1107 pub const fn right(&self) -> &R {
1109 &self.right
1110 }
1111
1112 pub const fn left_mut(&mut self) -> &mut L {
1114 &mut self.left
1115 }
1116
1117 pub const fn right_mut(&mut self) -> &mut R {
1119 &mut self.right
1120 }
1121}
1122
1123#[derive(Debug)]
1126pub struct WithConfigs<ChainSpec> {
1127 pub config: NodeConfig<ChainSpec>,
1129 pub toml_config: reth_config::Config,
1131}
1132
1133impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1134 fn clone(&self) -> Self {
1135 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1136 }
1137}
1138
1139#[derive(Debug, Clone)]
1142pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1143 provider_factory: ProviderFactory<N>,
1144 metrics_sender: UnboundedSender<MetricEvent>,
1145}
1146
1147#[expect(missing_debug_implementations)]
1150pub struct WithMeteredProviders<T>
1151where
1152 T: FullNodeTypes,
1153{
1154 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1155 blockchain_db: T::Provider,
1156}
1157
1158#[expect(missing_debug_implementations)]
1160pub struct WithComponents<T, CB>
1161where
1162 T: FullNodeTypes,
1163 CB: NodeComponentsBuilder<T>,
1164{
1165 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1166 node_adapter: NodeAdapter<T, CB::Components>,
1167 head: Head,
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::{LaunchContext, NodeConfig};
1173 use reth_config::Config;
1174 use reth_node_core::args::PruningArgs;
1175
1176 const EXTENSION: &str = "toml";
1177
1178 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1179 let temp_dir = tempfile::tempdir().unwrap();
1180 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1181 proc(&config_path);
1182 temp_dir.close().unwrap()
1183 }
1184
1185 #[test]
1186 fn test_save_prune_config() {
1187 with_tempdir("prune-store-test", |config_path| {
1188 let mut reth_config = Config::default();
1189 let node_config = NodeConfig {
1190 pruning: PruningArgs {
1191 full: true,
1192 block_interval: None,
1193 sender_recovery_full: false,
1194 sender_recovery_distance: None,
1195 sender_recovery_before: None,
1196 transaction_lookup_full: false,
1197 transaction_lookup_distance: None,
1198 transaction_lookup_before: None,
1199 receipts_full: false,
1200 receipts_pre_merge: false,
1201 receipts_distance: None,
1202 receipts_before: None,
1203 account_history_full: false,
1204 account_history_distance: None,
1205 account_history_before: None,
1206 storage_history_full: false,
1207 storage_history_distance: None,
1208 storage_history_before: None,
1209 bodies_pre_merge: false,
1210 bodies_distance: None,
1211 receipts_log_filter: None,
1212 bodies_before: None,
1213 },
1214 ..NodeConfig::test()
1215 };
1216 LaunchContext::save_pruning_config_if_full_node(
1217 &mut reth_config,
1218 &node_config,
1219 config_path,
1220 )
1221 .unwrap();
1222
1223 let loaded_config = Config::from_path(config_path).unwrap();
1224
1225 assert_eq!(reth_config, loaded_config);
1226 })
1227 }
1228}