1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
69 BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70 RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
71 StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78 StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::{
83 throttle,
84 tracing::{debug, error, info, warn},
85};
86use reth_transaction_pool::TransactionPool;
87use reth_trie_db::ChangesetCache;
88use std::{sync::Arc, thread::available_parallelism, time::Duration};
89use tokio::sync::{
90 mpsc::{unbounded_channel, UnboundedSender},
91 oneshot, watch,
92};
93
94use futures::{future::Either, stream, Stream, StreamExt};
95use reth_node_ethstats::EthStatsService;
96use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
97
98#[derive(Debug, Clone)]
117pub struct LaunchContext {
118 pub task_executor: TaskExecutor,
120 pub data_dir: ChainPath<DataDirPath>,
122}
123
124impl LaunchContext {
125 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
127 Self { task_executor, data_dir }
128 }
129
130 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
132 LaunchContextWith { inner: self, attachment }
133 }
134
135 pub fn with_loaded_toml_config<ChainSpec>(
140 self,
141 config: NodeConfig<ChainSpec>,
142 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
143 where
144 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
145 {
146 let toml_config = self.load_toml_config(&config)?;
147 Ok(self.with(WithConfigs { config, toml_config }))
148 }
149
150 pub fn load_toml_config<ChainSpec>(
155 &self,
156 config: &NodeConfig<ChainSpec>,
157 ) -> eyre::Result<reth_config::Config>
158 where
159 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
160 {
161 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
162
163 let mut toml_config = reth_config::Config::from_path(&config_path)
164 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
165
166 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
167
168 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
169
170 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
172
173 toml_config.static_files =
175 config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
176
177 Ok(toml_config)
178 }
179
180 fn save_pruning_config<ChainSpec>(
183 reth_config: &mut reth_config::Config,
184 config: &NodeConfig<ChainSpec>,
185 config_path: impl AsRef<std::path::Path>,
186 ) -> eyre::Result<()>
187 where
188 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
189 {
190 let mut should_save = reth_config.prune.segments.migrate();
191
192 if let Some(prune_config) = config.prune_config() {
193 if reth_config.prune != prune_config {
194 reth_config.set_prune_config(prune_config);
195 should_save = true;
196 }
197 } else if !reth_config.prune.is_default() {
198 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
199 }
200
201 if should_save {
202 info!(target: "reth::cli", "Saving prune config to toml file");
203 reth_config.save(config_path.as_ref())?;
204 }
205
206 Ok(())
207 }
208
209 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
211 self.configure_globals(reserved_cpu_cores);
212 self
213 }
214
215 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
220 match fdlimit::raise_fd_limit() {
223 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
224 debug!(from, to, "Raised file descriptor limit");
225 }
226 Ok(fdlimit::Outcome::Unsupported) => {}
227 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
228 }
229
230 let num_threads = available_parallelism()
232 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
233 if let Err(err) = ThreadPoolBuilder::new()
234 .num_threads(num_threads)
235 .thread_name(|i| format!("rayon-{i:02}"))
236 .build_global()
237 {
238 warn!(%err, "Failed to build global thread pool")
239 }
240 }
241}
242
243#[derive(Debug, Clone)]
254pub struct LaunchContextWith<T> {
255 pub inner: LaunchContext,
257 pub attachment: T,
259}
260
261impl<T> LaunchContextWith<T> {
262 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
267 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
268 }
269
270 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
272 &self.inner.data_dir
273 }
274
275 pub const fn task_executor(&self) -> &TaskExecutor {
277 &self.inner.task_executor
278 }
279
280 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
282 LaunchContextWith {
283 inner: self.inner,
284 attachment: Attached::new(self.attachment, attachment),
285 }
286 }
287
288 pub fn inspect<F>(self, f: F) -> Self
291 where
292 F: FnOnce(&Self),
293 {
294 f(&self);
295 self
296 }
297}
298
299impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
300 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
302 if !self.attachment.config.network.trusted_peers.is_empty() {
303 info!(target: "reth::cli", "Adding trusted nodes");
304
305 self.attachment
306 .toml_config
307 .peers
308 .trusted_nodes
309 .extend(self.attachment.config.network.trusted_peers.clone());
310 }
311 Ok(self)
312 }
313}
314
315impl<L, R> LaunchContextWith<Attached<L, R>> {
316 pub const fn left(&self) -> &L {
318 &self.attachment.left
319 }
320
321 pub const fn right(&self) -> &R {
323 &self.attachment.right
324 }
325
326 pub const fn left_mut(&mut self) -> &mut L {
328 &mut self.attachment.left
329 }
330
331 pub const fn right_mut(&mut self) -> &mut R {
333 &mut self.attachment.right
334 }
335}
336impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
337 pub fn with_adjusted_configs(self) -> Self {
343 self.ensure_etl_datadir().with_adjusted_instance_ports()
344 }
345
346 pub fn ensure_etl_datadir(mut self) -> Self {
348 if self.toml_config_mut().stages.etl.dir.is_none() {
349 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
350 if etl_path.exists() {
351 if let Err(err) = fs::remove_dir_all(&etl_path) {
353 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
354 }
355 }
356 self.toml_config_mut().stages.etl.dir = Some(etl_path);
357 }
358
359 self
360 }
361
362 pub fn with_adjusted_instance_ports(mut self) -> Self {
364 self.node_config_mut().adjust_instance_ports();
365 self
366 }
367
368 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
370 self.attachment.left()
371 }
372
373 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
375 &self.left().config
376 }
377
378 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
380 &mut self.left_mut().config
381 }
382
383 pub const fn toml_config(&self) -> &reth_config::Config {
385 &self.left().toml_config
386 }
387
388 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
390 &mut self.left_mut().toml_config
391 }
392
393 pub fn chain_spec(&self) -> Arc<ChainSpec> {
395 self.node_config().chain.clone()
396 }
397
398 pub fn genesis_hash(&self) -> B256 {
400 self.node_config().chain.genesis_hash()
401 }
402
403 pub fn chain_id(&self) -> Chain {
405 self.node_config().chain.chain()
406 }
407
408 pub const fn is_dev(&self) -> bool {
410 self.node_config().dev.dev
411 }
412
413 pub fn prune_config(&self) -> PruneConfig
417 where
418 ChainSpec: reth_chainspec::EthereumHardforks,
419 {
420 let Some(mut node_prune_config) = self.node_config().prune_config() else {
421 return self.toml_config().prune.clone();
423 };
424
425 node_prune_config.merge(self.toml_config().prune.clone());
427 node_prune_config
428 }
429
430 pub fn prune_modes(&self) -> PruneModes
432 where
433 ChainSpec: reth_chainspec::EthereumHardforks,
434 {
435 self.prune_config().segments
436 }
437
438 pub fn pruner_builder(&self) -> PrunerBuilder
440 where
441 ChainSpec: reth_chainspec::EthereumHardforks,
442 {
443 PrunerBuilder::new(self.prune_config())
444 }
445
446 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
448 let default_jwt_path = self.data_dir().jwt();
449 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
450 Ok(secret)
451 }
452
453 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
455 where
456 Pool: TransactionPool + Unpin,
457 {
458 self.node_config().dev_mining_mode(pool)
459 }
460}
461
462impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
463where
464 DB: Database + Clone + 'static,
465 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
466{
467 pub async fn create_provider_factory<N, Evm>(
471 &self,
472 changeset_cache: ChangesetCache,
473 ) -> eyre::Result<ProviderFactory<N>>
474 where
475 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
476 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
477 {
478 let static_files_config = &self.toml_config().static_files;
480 static_files_config.validate()?;
481
482 let static_file_provider =
484 StaticFileProviderBuilder::read_write(self.data_dir().static_files())
485 .with_metrics()
486 .with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
487 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
488 .build()?;
489
490 let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
492 .with_default_tables()
493 .with_metrics()
494 .with_statistics()
495 .build()?;
496
497 let factory = ProviderFactory::new(
498 self.right().clone(),
499 self.chain_spec(),
500 static_file_provider,
501 rocksdb_provider,
502 self.task_executor().clone(),
503 )?
504 .with_prune_modes(self.prune_modes())
505 .with_changeset_cache(changeset_cache);
506
507 let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
511
512 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
514
515 if let Some(unwind_block) = unwind_target {
516 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
519 (Some(_), Some(_)) => "RocksDB and static file",
520 (Some(_), None) => "RocksDB",
521 (None, Some(_)) => "static file",
522 (None, None) => unreachable!(),
523 };
524 assert_ne!(
525 unwind_block, 0,
526 "A {} inconsistency was found that would trigger an unwind to block 0",
527 inconsistency_source
528 );
529
530 let unwind_target = PipelineTarget::Unwind(unwind_block);
531
532 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
533
534 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
535
536 let pipeline = PipelineBuilder::default()
538 .add_stages(DefaultStages::new(
539 factory.clone(),
540 tip_rx,
541 Arc::new(NoopConsensus::default()),
542 NoopHeaderDownloader::default(),
543 NoopBodiesDownloader::default(),
544 NoopEvmConfig::<Evm>::default(),
545 self.toml_config().stages.clone(),
546 self.prune_modes(),
547 None,
548 ))
549 .build(
550 factory.clone(),
551 StaticFileProducer::new(factory.clone(), self.prune_modes()),
552 );
553
554 let (tx, rx) = oneshot::channel();
556
557 self.task_executor().spawn_critical_blocking_task(
559 "pipeline task",
560 Box::pin(async move {
561 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
562 let _ = tx.send(result);
563 }),
564 );
565 rx.await?.inspect_err(|err| {
566 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
567 })?;
568 }
569
570 Ok(factory)
571 }
572
573 pub async fn with_provider_factory<N, Evm>(
575 self,
576 changeset_cache: ChangesetCache,
577 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
578 where
579 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
580 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
581 {
582 let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
583 let ctx = LaunchContextWith {
584 inner: self.inner,
585 attachment: self.attachment.map_right(|_| factory),
586 };
587
588 Ok(ctx)
589 }
590}
591
592impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
593where
594 T: ProviderNodeTypes,
595{
596 pub const fn database(&self) -> &T::DB {
598 self.right().db_ref()
599 }
600
601 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
603 self.right()
604 }
605
606 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
608 self.right().static_file_provider()
609 }
610
611 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
615 self.start_prometheus_endpoint().await?;
616 Ok(self)
617 }
618
619 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
621 install_prometheus_recorder().spawn_upkeep();
623
624 let listen_addr = self.node_config().metrics.prometheus;
625 if let Some(addr) = listen_addr {
626 let config = MetricServerConfig::new(
627 addr,
628 VersionInfo {
629 version: version_metadata().cargo_pkg_version.as_ref(),
630 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
631 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
632 git_sha: version_metadata().vergen_git_sha.as_ref(),
633 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
634 build_profile: version_metadata().build_profile_name.as_ref(),
635 },
636 ChainSpecInfo { name: self.chain_id().to_string() },
637 self.task_executor().clone(),
638 metrics_hooks(self.provider_factory()),
639 self.data_dir().pprof_dumps(),
640 )
641 .with_push_gateway(
642 self.node_config().metrics.push_gateway_url.clone(),
643 self.node_config().metrics.push_gateway_interval,
644 );
645
646 MetricServer::new(config).serve().await?;
647 }
648
649 Ok(())
650 }
651
652 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
654 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())?;
655 Ok(self)
656 }
657
658 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
660 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())
661 }
662
663 pub fn with_metrics_task(
669 self,
670 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
671 let (metrics_sender, metrics_receiver) = unbounded_channel();
672
673 let with_metrics =
674 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
675
676 debug!(target: "reth::cli", "Spawning stages metrics listener task");
677 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
678 self.task_executor()
679 .spawn_critical_task("stages metrics listener task", sync_metrics_listener);
680
681 LaunchContextWith {
682 inner: self.inner,
683 attachment: self.attachment.map_right(|_| with_metrics),
684 }
685 }
686}
687
688impl<N, DB>
689 LaunchContextWith<
690 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
691 >
692where
693 N: NodeTypes,
694 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
695{
696 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
698 &self.right().provider_factory
699 }
700
701 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
703 self.right().metrics_sender.clone()
704 }
705
706 #[expect(clippy::complexity)]
708 pub fn with_blockchain_db<T, F>(
709 self,
710 create_blockchain_provider: F,
711 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
712 where
713 T: FullNodeTypes<Types = N, DB = DB>,
714 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
715 {
716 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
717
718 let metered_providers = WithMeteredProviders {
719 db_provider_container: WithMeteredProvider {
720 provider_factory: self.provider_factory().clone(),
721 metrics_sender: self.sync_metrics_tx(),
722 },
723 blockchain_db,
724 };
725
726 let ctx = LaunchContextWith {
727 inner: self.inner,
728 attachment: self.attachment.map_right(|_| metered_providers),
729 };
730
731 Ok(ctx)
732 }
733}
734
735impl<T>
736 LaunchContextWith<
737 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
738 >
739where
740 T: FullNodeTypes<Types: NodeTypesForProvider>,
741{
742 pub const fn database(&self) -> &T::DB {
744 self.provider_factory().db_ref()
745 }
746
747 pub const fn provider_factory(
749 &self,
750 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
751 &self.right().db_provider_container.provider_factory
752 }
753
754 pub fn lookup_head(&self) -> eyre::Result<Head> {
758 self.node_config()
759 .lookup_head(self.provider_factory())
760 .wrap_err("the head block is missing")
761 }
762
763 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
765 self.right().db_provider_container.metrics_sender.clone()
766 }
767
768 pub const fn blockchain_db(&self) -> &T::Provider {
770 &self.right().blockchain_db
771 }
772
773 pub async fn with_components<CB>(
775 self,
776 components_builder: CB,
777 on_component_initialized: Box<
778 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
779 >,
780 ) -> eyre::Result<
781 LaunchContextWith<
782 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
783 >,
784 >
785 where
786 CB: NodeComponentsBuilder<T>,
787 {
788 let head = self.lookup_head()?;
790
791 let builder_ctx = BuilderContext::new(
792 head,
793 self.blockchain_db().clone(),
794 self.task_executor().clone(),
795 self.configs().clone(),
796 );
797
798 debug!(target: "reth::cli", "creating components");
799 let components = components_builder.build_components(&builder_ctx).await?;
800
801 let blockchain_db = self.blockchain_db().clone();
802
803 let node_adapter = NodeAdapter {
804 components,
805 task_executor: self.task_executor().clone(),
806 provider: blockchain_db,
807 };
808
809 debug!(target: "reth::cli", "calling on_component_initialized hook");
810 on_component_initialized.on_event(node_adapter.clone())?;
811
812 let components_container = WithComponents {
813 db_provider_container: WithMeteredProvider {
814 provider_factory: self.provider_factory().clone(),
815 metrics_sender: self.sync_metrics_tx(),
816 },
817 node_adapter,
818 head,
819 };
820
821 let ctx = LaunchContextWith {
822 inner: self.inner,
823 attachment: self.attachment.map_right(|_| components_container),
824 };
825
826 Ok(ctx)
827 }
828}
829
830impl<T, CB>
831 LaunchContextWith<
832 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
833 >
834where
835 T: FullNodeTypes<Types: NodeTypesForProvider>,
836 CB: NodeComponentsBuilder<T>,
837{
838 pub const fn provider_factory(
840 &self,
841 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
842 &self.right().db_provider_container.provider_factory
843 }
844
845 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
848 where
849 C: HeadersClient<Header: BlockHeader>,
850 {
851 self.node_config().max_block(client, self.provider_factory().clone()).await
852 }
853
854 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
856 self.provider_factory().static_file_provider()
857 }
858
859 pub fn static_file_producer(
861 &self,
862 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
863 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
864 }
865
866 pub const fn head(&self) -> Head {
868 self.right().head
869 }
870
871 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
873 &self.right().node_adapter
874 }
875
876 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
878 &mut self.right_mut().node_adapter
879 }
880
881 pub const fn blockchain_db(&self) -> &T::Provider {
883 &self.node_adapter().provider
884 }
885
886 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
892 let mut initial_target = self.node_config().debug.tip;
893
894 if initial_target.is_none() {
895 initial_target = self.check_pipeline_consistency()?;
896 }
897
898 Ok(initial_target)
899 }
900
901 pub const fn terminate_after_initial_backfill(&self) -> bool {
907 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
908 }
909
910 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
915 if self.chain_spec().is_optimism() &&
916 !self.is_dev() &&
917 self.chain_id() == Chain::optimism_mainnet()
918 {
919 let latest = self.blockchain_db().last_block_number()?;
920 if latest < 105235063 {
922 error!(
923 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
924 );
925 return Err(ProviderError::BestBlockNotFound);
926 }
927 }
928
929 Ok(())
930 }
931
932 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
944 let era_enabled = self.era_import_source().is_some();
946 let mut all_stages =
947 StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
948
949 let first_stage = all_stages.next().expect("there must be at least one stage");
951
952 let first_stage_checkpoint = self
955 .blockchain_db()
956 .get_stage_checkpoint(first_stage)?
957 .unwrap_or_default()
958 .block_number;
959
960 for stage_id in all_stages {
962 let stage_checkpoint = self
963 .blockchain_db()
964 .get_stage_checkpoint(stage_id)?
965 .unwrap_or_default()
966 .block_number;
967
968 debug!(
971 target: "consensus::engine",
972 first_stage_id = %first_stage,
973 first_stage_checkpoint,
974 stage_id = %stage_id,
975 stage_checkpoint = stage_checkpoint,
976 "Checking stage against first stage",
977 );
978 if stage_checkpoint < first_stage_checkpoint {
979 debug!(
980 target: "consensus::engine",
981 first_stage_id = %first_stage,
982 first_stage_checkpoint,
983 inconsistent_stage_id = %stage_id,
984 inconsistent_stage_checkpoint = stage_checkpoint,
985 "Pipeline sync progress is inconsistent"
986 );
987 return self.blockchain_db().block_hash(first_stage_checkpoint);
988 }
989 }
990
991 self.ensure_chain_specific_db_checks()?;
992
993 Ok(None)
994 }
995
996 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
998 self.right().db_provider_container.metrics_sender.clone()
999 }
1000
1001 pub const fn components(&self) -> &CB::Components {
1003 &self.node_adapter().components
1004 }
1005
1006 #[allow(clippy::type_complexity)]
1008 pub async fn launch_exex(
1009 &self,
1010 installed_exex: Vec<(
1011 String,
1012 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1013 )>,
1014 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1015 self.exex_launcher(installed_exex).launch().await
1016 }
1017
1018 #[allow(clippy::type_complexity)]
1030 pub fn exex_launcher(
1031 &self,
1032 installed_exex: Vec<(
1033 String,
1034 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1035 )>,
1036 ) -> ExExLauncher<NodeAdapter<T, CB::Components>> {
1037 ExExLauncher::new(
1038 self.head(),
1039 self.node_adapter().clone(),
1040 installed_exex,
1041 self.configs().clone(),
1042 )
1043 }
1044
1045 pub fn era_import_source(&self) -> Option<EraImportSource> {
1049 let node_config = self.node_config();
1050 if !node_config.era.enabled {
1051 return None;
1052 }
1053
1054 EraImportSource::maybe_new(
1055 node_config.era.source.path.clone(),
1056 node_config.era.source.url.clone(),
1057 || node_config.chain.chain().kind().default_era_host(),
1058 || node_config.datadir().data_dir().join("era").into(),
1059 )
1060 }
1061
1062 pub fn consensus_layer_events(
1070 &self,
1071 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1072 where
1073 T::Provider: reth_provider::CanonChainTracker,
1074 {
1075 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1076 Either::Left(
1077 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1078 .map(Into::into),
1079 )
1080 } else {
1081 Either::Right(stream::empty())
1082 }
1083 }
1084
1085 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1087 where
1088 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1089 + Send
1090 + Unpin
1091 + 'static,
1092 {
1093 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1094
1095 let network = self.components().network().clone();
1096 let pool = self.components().pool().clone();
1097 let provider = self.node_adapter().provider.clone();
1098
1099 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1100
1101 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1102
1103 let ethstats_for_events = ethstats.clone();
1105 let task_executor = self.task_executor().clone();
1106 task_executor.spawn_task(Box::pin(async move {
1107 while let Some(event) = engine_events.next().await {
1108 use reth_engine_primitives::ConsensusEngineEvent;
1109 match event {
1110 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1111 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1112 let block_hash = executed.recovered_block.num_hash().hash;
1113 let block_number = executed.recovered_block.num_hash().number;
1114 if let Err(e) = ethstats_for_events
1115 .report_new_payload(block_hash, block_number, duration)
1116 .await
1117 {
1118 debug!(
1119 target: "ethstats",
1120 "Failed to report new payload: {}", e
1121 );
1122 }
1123 }
1124 _ => {
1125 }
1127 }
1128 }
1129 }));
1130
1131 task_executor.spawn_task(Box::pin(async move { ethstats.run().await }));
1133
1134 Ok(())
1135 }
1136}
1137
1138#[derive(Clone, Copy, Debug)]
1144pub struct Attached<L, R> {
1145 left: L,
1146 right: R,
1147}
1148
1149impl<L, R> Attached<L, R> {
1150 pub const fn new(left: L, right: R) -> Self {
1152 Self { left, right }
1153 }
1154
1155 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1157 where
1158 F: FnOnce(L) -> T,
1159 {
1160 Attached::new(f(self.left), self.right)
1161 }
1162
1163 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1165 where
1166 F: FnOnce(R) -> T,
1167 {
1168 Attached::new(self.left, f(self.right))
1169 }
1170
1171 pub const fn left(&self) -> &L {
1173 &self.left
1174 }
1175
1176 pub const fn right(&self) -> &R {
1178 &self.right
1179 }
1180
1181 pub const fn left_mut(&mut self) -> &mut L {
1183 &mut self.left
1184 }
1185
1186 pub const fn right_mut(&mut self) -> &mut R {
1188 &mut self.right
1189 }
1190}
1191
1192#[derive(Debug)]
1195pub struct WithConfigs<ChainSpec> {
1196 pub config: NodeConfig<ChainSpec>,
1198 pub toml_config: reth_config::Config,
1200}
1201
1202impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1203 fn clone(&self) -> Self {
1204 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1205 }
1206}
1207
1208#[derive(Debug, Clone)]
1211pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1212 provider_factory: ProviderFactory<N>,
1213 metrics_sender: UnboundedSender<MetricEvent>,
1214}
1215
1216#[expect(missing_debug_implementations)]
1219pub struct WithMeteredProviders<T>
1220where
1221 T: FullNodeTypes,
1222{
1223 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1224 blockchain_db: T::Provider,
1225}
1226
1227#[expect(missing_debug_implementations)]
1229pub struct WithComponents<T, CB>
1230where
1231 T: FullNodeTypes,
1232 CB: NodeComponentsBuilder<T>,
1233{
1234 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1235 node_adapter: NodeAdapter<T, CB::Components>,
1236 head: Head,
1237}
1238
1239pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1241 Hooks::builder()
1242 .with_hook({
1243 let db = provider_factory.db_ref().clone();
1244 move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1245 })
1246 .with_hook({
1247 let sfp = provider_factory.static_file_provider();
1248 move || {
1249 throttle!(Duration::from_secs(5 * 60), || {
1250 if let Err(error) = sfp.report_metrics() {
1251 error!(%error, "Failed to report metrics from static file provider");
1252 }
1253 })
1254 }
1255 })
1256 .with_hook({
1257 let rocksdb = provider_factory.rocksdb_provider();
1258 move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1259 })
1260 .build()
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use super::{LaunchContext, NodeConfig};
1266 use reth_config::Config;
1267 use reth_node_core::args::PruningArgs;
1268
1269 const EXTENSION: &str = "toml";
1270
1271 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1272 let temp_dir = tempfile::tempdir().unwrap();
1273 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1274 proc(&config_path);
1275 temp_dir.close().unwrap()
1276 }
1277
1278 #[test]
1279 fn test_save_prune_config() {
1280 with_tempdir("prune-store-test", |config_path| {
1281 let mut reth_config = Config::default();
1282 let node_config = NodeConfig {
1283 pruning: PruningArgs {
1284 full: true,
1285 minimal: false,
1286 block_interval: None,
1287 sender_recovery_full: false,
1288 sender_recovery_distance: None,
1289 sender_recovery_before: None,
1290 transaction_lookup_full: false,
1291 transaction_lookup_distance: None,
1292 transaction_lookup_before: None,
1293 receipts_full: false,
1294 receipts_pre_merge: false,
1295 receipts_distance: None,
1296 receipts_before: None,
1297 account_history_full: false,
1298 account_history_distance: None,
1299 account_history_before: None,
1300 storage_history_full: false,
1301 storage_history_distance: None,
1302 storage_history_before: None,
1303 bodies_pre_merge: false,
1304 bodies_distance: None,
1305 receipts_log_filter: None,
1306 bodies_before: None,
1307 },
1308 ..NodeConfig::test()
1309 };
1310 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1311 .unwrap();
1312
1313 let loaded_config = Config::from_path(config_path).unwrap();
1314
1315 assert_eq!(reth_config, loaded_config);
1316 })
1317 }
1318}