1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
69 BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70 RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
71 StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78 StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::{
83 throttle,
84 tracing::{debug, error, info, warn},
85};
86use reth_transaction_pool::TransactionPool;
87use reth_trie_db::ChangesetCache;
88use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism, time::Duration};
89use tokio::sync::{
90 mpsc::{unbounded_channel, UnboundedSender},
91 oneshot, watch,
92};
93
94use futures::{future::Either, stream, Stream, StreamExt};
95use reth_node_ethstats::EthStatsService;
96use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
97
98#[derive(Debug, Clone)]
117pub struct LaunchContext {
118 pub task_executor: TaskExecutor,
120 pub data_dir: ChainPath<DataDirPath>,
122}
123
124impl LaunchContext {
125 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
127 Self { task_executor, data_dir }
128 }
129
130 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
132 LaunchContextWith { inner: self, attachment }
133 }
134
135 pub fn with_loaded_toml_config<ChainSpec>(
140 self,
141 config: NodeConfig<ChainSpec>,
142 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
143 where
144 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
145 {
146 let toml_config = self.load_toml_config(&config)?;
147 Ok(self.with(WithConfigs { config, toml_config }))
148 }
149
150 pub fn load_toml_config<ChainSpec>(
155 &self,
156 config: &NodeConfig<ChainSpec>,
157 ) -> eyre::Result<reth_config::Config>
158 where
159 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
160 {
161 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
162
163 let mut toml_config = reth_config::Config::from_path(&config_path)
164 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
165
166 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
167
168 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
169
170 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
172
173 toml_config.static_files =
175 config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
176
177 Ok(toml_config)
178 }
179
180 fn save_pruning_config<ChainSpec>(
183 reth_config: &mut reth_config::Config,
184 config: &NodeConfig<ChainSpec>,
185 config_path: impl AsRef<std::path::Path>,
186 ) -> eyre::Result<()>
187 where
188 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
189 {
190 let mut should_save = reth_config.prune.segments.migrate();
191
192 if let Some(prune_config) = config.prune_config() {
193 if reth_config.prune != prune_config {
194 reth_config.set_prune_config(prune_config);
195 should_save = true;
196 }
197 } else if !reth_config.prune.is_default() {
198 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
199 }
200
201 if should_save {
202 info!(target: "reth::cli", "Saving prune config to toml file");
203 reth_config.save(config_path.as_ref())?;
204 }
205
206 Ok(())
207 }
208
209 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
211 self.configure_globals(reserved_cpu_cores);
212 self
213 }
214
215 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
220 match fdlimit::raise_fd_limit() {
223 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
224 debug!(from, to, "Raised file descriptor limit");
225 }
226 Ok(fdlimit::Outcome::Unsupported) => {}
227 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
228 }
229
230 let _ = reserved_cpu_cores;
234 let num_threads = available_parallelism().map_or(1, NonZeroUsize::get);
235 if let Err(err) = ThreadPoolBuilder::new()
236 .num_threads(num_threads)
237 .thread_name(|i| format!("rayon-{i:02}"))
238 .build_global()
239 {
240 warn!(%err, "Failed to build global thread pool")
241 }
242 }
243}
244
245#[derive(Debug, Clone)]
256pub struct LaunchContextWith<T> {
257 pub inner: LaunchContext,
259 pub attachment: T,
261}
262
263impl<T> LaunchContextWith<T> {
264 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
269 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
270 }
271
272 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
274 &self.inner.data_dir
275 }
276
277 pub const fn task_executor(&self) -> &TaskExecutor {
279 &self.inner.task_executor
280 }
281
282 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
284 LaunchContextWith {
285 inner: self.inner,
286 attachment: Attached::new(self.attachment, attachment),
287 }
288 }
289
290 pub fn inspect<F>(self, f: F) -> Self
293 where
294 F: FnOnce(&Self),
295 {
296 f(&self);
297 self
298 }
299}
300
301impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
302 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
304 if !self.attachment.config.network.trusted_peers.is_empty() {
305 info!(target: "reth::cli", "Adding trusted nodes");
306
307 self.attachment
308 .toml_config
309 .peers
310 .trusted_nodes
311 .extend(self.attachment.config.network.trusted_peers.clone());
312 }
313 Ok(self)
314 }
315}
316
317impl<L, R> LaunchContextWith<Attached<L, R>> {
318 pub const fn left(&self) -> &L {
320 &self.attachment.left
321 }
322
323 pub const fn right(&self) -> &R {
325 &self.attachment.right
326 }
327
328 pub const fn left_mut(&mut self) -> &mut L {
330 &mut self.attachment.left
331 }
332
333 pub const fn right_mut(&mut self) -> &mut R {
335 &mut self.attachment.right
336 }
337}
338impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
339 pub fn with_adjusted_configs(self) -> Self {
345 self.ensure_etl_datadir().with_adjusted_instance_ports()
346 }
347
348 pub fn ensure_etl_datadir(mut self) -> Self {
350 if self.toml_config_mut().stages.etl.dir.is_none() {
351 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
352 if etl_path.exists() {
353 if let Err(err) = fs::remove_dir_all(&etl_path) {
355 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
356 }
357 }
358 self.toml_config_mut().stages.etl.dir = Some(etl_path);
359 }
360
361 self
362 }
363
364 pub fn with_adjusted_instance_ports(mut self) -> Self {
366 self.node_config_mut().adjust_instance_ports();
367 self
368 }
369
370 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
372 self.attachment.left()
373 }
374
375 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
377 &self.left().config
378 }
379
380 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
382 &mut self.left_mut().config
383 }
384
385 pub const fn toml_config(&self) -> &reth_config::Config {
387 &self.left().toml_config
388 }
389
390 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
392 &mut self.left_mut().toml_config
393 }
394
395 pub fn chain_spec(&self) -> Arc<ChainSpec> {
397 self.node_config().chain.clone()
398 }
399
400 pub fn genesis_hash(&self) -> B256 {
402 self.node_config().chain.genesis_hash()
403 }
404
405 pub fn chain_id(&self) -> Chain {
407 self.node_config().chain.chain()
408 }
409
410 pub const fn is_dev(&self) -> bool {
412 self.node_config().dev.dev
413 }
414
415 pub fn prune_config(&self) -> PruneConfig
419 where
420 ChainSpec: reth_chainspec::EthereumHardforks,
421 {
422 let Some(mut node_prune_config) = self.node_config().prune_config() else {
423 return self.toml_config().prune.clone();
425 };
426
427 node_prune_config.merge(self.toml_config().prune.clone());
429 node_prune_config
430 }
431
432 pub fn prune_modes(&self) -> PruneModes
434 where
435 ChainSpec: reth_chainspec::EthereumHardforks,
436 {
437 self.prune_config().segments
438 }
439
440 pub fn pruner_builder(&self) -> PrunerBuilder
442 where
443 ChainSpec: reth_chainspec::EthereumHardforks,
444 {
445 PrunerBuilder::new(self.prune_config())
446 }
447
448 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
450 let default_jwt_path = self.data_dir().jwt();
451 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
452 Ok(secret)
453 }
454
455 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
457 where
458 Pool: TransactionPool + Unpin,
459 {
460 self.node_config().dev_mining_mode(pool)
461 }
462}
463
464impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
465where
466 DB: Database + Clone + 'static,
467 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
468{
469 pub async fn create_provider_factory<N, Evm>(
473 &self,
474 changeset_cache: ChangesetCache,
475 rocksdb_provider: Option<RocksDBProvider>,
476 ) -> eyre::Result<ProviderFactory<N>>
477 where
478 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
479 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
480 {
481 let static_files_config = &self.toml_config().static_files;
483 static_files_config.validate()?;
484
485 let static_file_provider =
487 StaticFileProviderBuilder::read_write(self.data_dir().static_files())
488 .with_metrics()
489 .with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
490 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
491 .build()?;
492
493 let rocksdb_provider = if let Some(provider) = rocksdb_provider {
495 provider
496 } else {
497 RocksDBProvider::builder(self.data_dir().rocksdb())
498 .with_default_tables()
499 .with_metrics()
500 .with_statistics()
501 .build()?
502 };
503
504 let prune_config = self.prune_config();
505 let factory = ProviderFactory::new(
506 self.right().clone(),
507 self.chain_spec(),
508 static_file_provider,
509 rocksdb_provider,
510 self.task_executor().clone(),
511 )?
512 .with_prune_modes(prune_config.segments)
513 .with_minimum_pruning_distance(prune_config.minimum_pruning_distance)
514 .with_changeset_cache(changeset_cache);
515
516 let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
520
521 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
523
524 if let Some(unwind_block) = unwind_target {
525 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
528 (Some(_), Some(_)) => "RocksDB and static file",
529 (Some(_), None) => "RocksDB",
530 (None, Some(_)) => "static file",
531 (None, None) => unreachable!(),
532 };
533 assert_ne!(
534 unwind_block, 0,
535 "A {} inconsistency was found that would trigger an unwind to block 0",
536 inconsistency_source
537 );
538
539 let unwind_target = PipelineTarget::Unwind(unwind_block);
540
541 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
542
543 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
544
545 let pipeline = PipelineBuilder::default()
547 .add_stages(DefaultStages::new(
548 factory.clone(),
549 tip_rx,
550 Arc::new(NoopConsensus::default()),
551 NoopHeaderDownloader::default(),
552 NoopBodiesDownloader::default(),
553 NoopEvmConfig::<Evm>::default(),
554 self.toml_config().stages.clone(),
555 self.prune_modes(),
556 None,
557 ))
558 .build(
559 factory.clone(),
560 StaticFileProducer::new(factory.clone(), self.prune_modes()),
561 );
562
563 let (tx, rx) = oneshot::channel();
565
566 self.task_executor().spawn_critical_blocking_task("pipeline task", async move {
568 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
569 let _ = tx.send(result);
570 });
571 rx.await?.inspect_err(|err| {
572 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
573 })?;
574 }
575
576 Ok(factory)
577 }
578
579 pub async fn with_provider_factory<N, Evm>(
581 self,
582 changeset_cache: ChangesetCache,
583 rocksdb_provider: Option<RocksDBProvider>,
584 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
585 where
586 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
587 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
588 {
589 let factory =
590 self.create_provider_factory::<N, Evm>(changeset_cache, rocksdb_provider).await?;
591 let ctx = LaunchContextWith {
592 inner: self.inner,
593 attachment: self.attachment.map_right(|_| factory),
594 };
595
596 Ok(ctx)
597 }
598}
599
600impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
601where
602 T: ProviderNodeTypes,
603{
604 pub const fn database(&self) -> &T::DB {
606 self.right().db_ref()
607 }
608
609 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
611 self.right()
612 }
613
614 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
616 self.right().static_file_provider()
617 }
618
619 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
623 self.start_prometheus_endpoint().await?;
624 Ok(self)
625 }
626
627 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
629 install_prometheus_recorder().spawn_upkeep();
631
632 let listen_addr = self.node_config().metrics.prometheus;
633 if let Some(addr) = listen_addr {
634 let config = MetricServerConfig::new(
635 addr,
636 VersionInfo {
637 version: version_metadata().cargo_pkg_version.as_ref(),
638 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
639 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
640 git_sha: version_metadata().vergen_git_sha.as_ref(),
641 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
642 build_profile: version_metadata().build_profile_name.as_ref(),
643 },
644 ChainSpecInfo { name: self.chain_id().to_string() },
645 self.task_executor().clone(),
646 metrics_hooks(self.provider_factory()),
647 self.data_dir().pprof_dumps(),
648 )
649 .with_push_gateway(
650 self.node_config().metrics.push_gateway_url.clone(),
651 self.node_config().metrics.push_gateway_interval,
652 );
653
654 MetricServer::new(config).serve().await?;
655 }
656
657 Ok(())
658 }
659
660 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
662 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())?;
663 Ok(self)
664 }
665
666 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
668 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())
669 }
670
671 pub fn with_metrics_task(
677 self,
678 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
679 let (metrics_sender, metrics_receiver) = unbounded_channel();
680
681 let with_metrics =
682 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
683
684 debug!(target: "reth::cli", "Spawning stages metrics listener task");
685 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
686 self.task_executor()
687 .spawn_critical_task("stages metrics listener task", sync_metrics_listener);
688
689 LaunchContextWith {
690 inner: self.inner,
691 attachment: self.attachment.map_right(|_| with_metrics),
692 }
693 }
694}
695
696impl<N, DB>
697 LaunchContextWith<
698 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
699 >
700where
701 N: NodeTypes,
702 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
703{
704 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
706 &self.right().provider_factory
707 }
708
709 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
711 self.right().metrics_sender.clone()
712 }
713
714 #[expect(clippy::complexity)]
716 pub fn with_blockchain_db<T, F>(
717 self,
718 create_blockchain_provider: F,
719 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
720 where
721 T: FullNodeTypes<Types = N, DB = DB>,
722 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
723 {
724 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
725
726 let metered_providers = WithMeteredProviders {
727 db_provider_container: WithMeteredProvider {
728 provider_factory: self.provider_factory().clone(),
729 metrics_sender: self.sync_metrics_tx(),
730 },
731 blockchain_db,
732 };
733
734 let ctx = LaunchContextWith {
735 inner: self.inner,
736 attachment: self.attachment.map_right(|_| metered_providers),
737 };
738
739 Ok(ctx)
740 }
741}
742
743impl<T>
744 LaunchContextWith<
745 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
746 >
747where
748 T: FullNodeTypes<Types: NodeTypesForProvider>,
749{
750 pub const fn database(&self) -> &T::DB {
752 self.provider_factory().db_ref()
753 }
754
755 pub const fn provider_factory(
757 &self,
758 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
759 &self.right().db_provider_container.provider_factory
760 }
761
762 pub fn lookup_head(&self) -> eyre::Result<Head> {
766 self.node_config()
767 .lookup_head(self.provider_factory())
768 .wrap_err("the head block is missing")
769 }
770
771 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
773 self.right().db_provider_container.metrics_sender.clone()
774 }
775
776 pub const fn blockchain_db(&self) -> &T::Provider {
778 &self.right().blockchain_db
779 }
780
781 pub async fn with_components<CB>(
783 self,
784 components_builder: CB,
785 on_component_initialized: Box<
786 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
787 >,
788 ) -> eyre::Result<
789 LaunchContextWith<
790 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
791 >,
792 >
793 where
794 CB: NodeComponentsBuilder<T>,
795 {
796 let head = self.lookup_head()?;
798
799 let builder_ctx = BuilderContext::new(
800 head,
801 self.blockchain_db().clone(),
802 self.task_executor().clone(),
803 self.configs().clone(),
804 );
805
806 debug!(target: "reth::cli", "creating components");
807 let components = components_builder.build_components(&builder_ctx).await?;
808
809 let blockchain_db = self.blockchain_db().clone();
810
811 let node_adapter = NodeAdapter {
812 components,
813 task_executor: self.task_executor().clone(),
814 provider: blockchain_db,
815 };
816
817 debug!(target: "reth::cli", "calling on_component_initialized hook");
818 on_component_initialized.on_event(node_adapter.clone())?;
819
820 let components_container = WithComponents {
821 db_provider_container: WithMeteredProvider {
822 provider_factory: self.provider_factory().clone(),
823 metrics_sender: self.sync_metrics_tx(),
824 },
825 node_adapter,
826 head,
827 };
828
829 let ctx = LaunchContextWith {
830 inner: self.inner,
831 attachment: self.attachment.map_right(|_| components_container),
832 };
833
834 Ok(ctx)
835 }
836}
837
838impl<T, CB>
839 LaunchContextWith<
840 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
841 >
842where
843 T: FullNodeTypes<Types: NodeTypesForProvider>,
844 CB: NodeComponentsBuilder<T>,
845{
846 pub const fn provider_factory(
848 &self,
849 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
850 &self.right().db_provider_container.provider_factory
851 }
852
853 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
856 where
857 C: HeadersClient<Header: BlockHeader>,
858 {
859 self.node_config().max_block(client, self.provider_factory().clone()).await
860 }
861
862 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
864 self.provider_factory().static_file_provider()
865 }
866
867 pub fn static_file_producer(
869 &self,
870 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
871 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
872 }
873
874 pub const fn head(&self) -> Head {
876 self.right().head
877 }
878
879 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
881 &self.right().node_adapter
882 }
883
884 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
886 &mut self.right_mut().node_adapter
887 }
888
889 pub const fn blockchain_db(&self) -> &T::Provider {
891 &self.node_adapter().provider
892 }
893
894 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
900 let mut initial_target = self.node_config().debug.tip;
901
902 if initial_target.is_none() {
903 initial_target = self.check_pipeline_consistency()?;
904 }
905
906 Ok(initial_target)
907 }
908
909 pub const fn terminate_after_initial_backfill(&self) -> bool {
915 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
916 }
917
918 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
923 if self.chain_spec().is_optimism() &&
924 !self.is_dev() &&
925 self.chain_id() == Chain::optimism_mainnet()
926 {
927 let latest = self.blockchain_db().last_block_number()?;
928 if latest < 105235063 {
930 error!(
931 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
932 );
933 return Err(ProviderError::BestBlockNotFound);
934 }
935 }
936
937 Ok(())
938 }
939
940 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
952 let era_enabled = self.era_import_source().is_some();
954 let mut all_stages =
955 StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
956
957 let first_stage = all_stages.next().expect("there must be at least one stage");
959
960 let first_stage_checkpoint = self
963 .blockchain_db()
964 .get_stage_checkpoint(first_stage)?
965 .unwrap_or_default()
966 .block_number;
967
968 for stage_id in all_stages {
970 let stage_checkpoint = self
971 .blockchain_db()
972 .get_stage_checkpoint(stage_id)?
973 .unwrap_or_default()
974 .block_number;
975
976 debug!(
979 target: "consensus::engine",
980 first_stage_id = %first_stage,
981 first_stage_checkpoint,
982 stage_id = %stage_id,
983 stage_checkpoint = stage_checkpoint,
984 "Checking stage against first stage",
985 );
986 if stage_checkpoint < first_stage_checkpoint {
987 debug!(
988 target: "consensus::engine",
989 first_stage_id = %first_stage,
990 first_stage_checkpoint,
991 inconsistent_stage_id = %stage_id,
992 inconsistent_stage_checkpoint = stage_checkpoint,
993 "Pipeline sync progress is inconsistent"
994 );
995 return self.blockchain_db().block_hash(first_stage_checkpoint);
996 }
997 }
998
999 self.ensure_chain_specific_db_checks()?;
1000
1001 Ok(None)
1002 }
1003
1004 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
1006 self.right().db_provider_container.metrics_sender.clone()
1007 }
1008
1009 pub const fn components(&self) -> &CB::Components {
1011 &self.node_adapter().components
1012 }
1013
1014 #[allow(clippy::type_complexity)]
1016 pub async fn launch_exex(
1017 &self,
1018 installed_exex: Vec<(
1019 String,
1020 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1021 )>,
1022 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1023 self.exex_launcher(installed_exex).launch().await
1024 }
1025
1026 #[allow(clippy::type_complexity)]
1038 pub fn exex_launcher(
1039 &self,
1040 installed_exex: Vec<(
1041 String,
1042 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1043 )>,
1044 ) -> ExExLauncher<NodeAdapter<T, CB::Components>> {
1045 ExExLauncher::new(
1046 self.head(),
1047 self.node_adapter().clone(),
1048 installed_exex,
1049 self.configs().clone(),
1050 )
1051 }
1052
1053 pub fn era_import_source(&self) -> Option<EraImportSource> {
1057 let node_config = self.node_config();
1058 if !node_config.era.enabled {
1059 return None;
1060 }
1061
1062 EraImportSource::maybe_new(
1063 node_config.era.source.path.clone(),
1064 node_config.era.source.url.clone(),
1065 || node_config.chain.chain().kind().default_era_host(),
1066 || node_config.datadir().data_dir().join("era").into(),
1067 )
1068 }
1069
1070 pub fn consensus_layer_events(
1078 &self,
1079 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1080 where
1081 T::Provider: reth_provider::CanonChainTracker,
1082 {
1083 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1084 Either::Left(
1085 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1086 .map(Into::into),
1087 )
1088 } else {
1089 Either::Right(stream::empty())
1090 }
1091 }
1092
1093 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1095 where
1096 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1097 + Send
1098 + Unpin
1099 + 'static,
1100 {
1101 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1102
1103 let network = self.components().network().clone();
1104 let pool = self.components().pool().clone();
1105 let provider = self.node_adapter().provider.clone();
1106
1107 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1108
1109 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1110
1111 let ethstats_for_events = ethstats.clone();
1113 let task_executor = self.task_executor().clone();
1114 task_executor.spawn_task(async move {
1115 while let Some(event) = engine_events.next().await {
1116 use reth_engine_primitives::ConsensusEngineEvent;
1117 match event {
1118 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1119 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1120 let block_hash = executed.recovered_block.num_hash().hash;
1121 let block_number = executed.recovered_block.num_hash().number;
1122 if let Err(e) = ethstats_for_events
1123 .report_new_payload(block_hash, block_number, duration)
1124 .await
1125 {
1126 debug!(
1127 target: "ethstats",
1128 "Failed to report new payload: {}", e
1129 );
1130 }
1131 }
1132 _ => {
1133 }
1135 }
1136 }
1137 });
1138
1139 task_executor.spawn_task(async move { ethstats.run().await });
1141
1142 Ok(())
1143 }
1144}
1145
1146#[derive(Clone, Copy, Debug)]
1152pub struct Attached<L, R> {
1153 left: L,
1154 right: R,
1155}
1156
1157impl<L, R> Attached<L, R> {
1158 pub const fn new(left: L, right: R) -> Self {
1160 Self { left, right }
1161 }
1162
1163 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1165 where
1166 F: FnOnce(L) -> T,
1167 {
1168 Attached::new(f(self.left), self.right)
1169 }
1170
1171 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1173 where
1174 F: FnOnce(R) -> T,
1175 {
1176 Attached::new(self.left, f(self.right))
1177 }
1178
1179 pub const fn left(&self) -> &L {
1181 &self.left
1182 }
1183
1184 pub const fn right(&self) -> &R {
1186 &self.right
1187 }
1188
1189 pub const fn left_mut(&mut self) -> &mut L {
1191 &mut self.left
1192 }
1193
1194 pub const fn right_mut(&mut self) -> &mut R {
1196 &mut self.right
1197 }
1198}
1199
1200#[derive(Debug)]
1203pub struct WithConfigs<ChainSpec> {
1204 pub config: NodeConfig<ChainSpec>,
1206 pub toml_config: reth_config::Config,
1208}
1209
1210impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1211 fn clone(&self) -> Self {
1212 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1213 }
1214}
1215
1216#[derive(Debug, Clone)]
1219pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1220 provider_factory: ProviderFactory<N>,
1221 metrics_sender: UnboundedSender<MetricEvent>,
1222}
1223
1224#[expect(missing_debug_implementations)]
1227pub struct WithMeteredProviders<T>
1228where
1229 T: FullNodeTypes,
1230{
1231 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1232 blockchain_db: T::Provider,
1233}
1234
1235#[expect(missing_debug_implementations)]
1237pub struct WithComponents<T, CB>
1238where
1239 T: FullNodeTypes,
1240 CB: NodeComponentsBuilder<T>,
1241{
1242 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1243 node_adapter: NodeAdapter<T, CB::Components>,
1244 head: Head,
1245}
1246
1247pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1249 Hooks::builder()
1250 .with_hook({
1251 let db = provider_factory.db_ref().clone();
1252 move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1253 })
1254 .with_hook({
1255 let sfp = provider_factory.static_file_provider();
1256 move || {
1257 throttle!(Duration::from_secs(5 * 60), || {
1258 if let Err(error) = sfp.report_metrics() {
1259 error!(%error, "Failed to report metrics from static file provider");
1260 }
1261 })
1262 }
1263 })
1264 .with_hook({
1265 let rocksdb = provider_factory.rocksdb_provider();
1266 move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1267 })
1268 .build()
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use super::{LaunchContext, NodeConfig};
1274 use reth_config::Config;
1275 use reth_node_core::args::PruningArgs;
1276
1277 const EXTENSION: &str = "toml";
1278
1279 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1280 let temp_dir = tempfile::tempdir().unwrap();
1281 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1282 proc(&config_path);
1283 temp_dir.close().unwrap()
1284 }
1285
1286 #[test]
1287 fn test_save_prune_config() {
1288 with_tempdir("prune-store-test", |config_path| {
1289 let mut reth_config = Config::default();
1290 let node_config = NodeConfig {
1291 pruning: PruningArgs {
1292 full: true,
1293 minimal: false,
1294 block_interval: None,
1295 sender_recovery_full: false,
1296 sender_recovery_distance: None,
1297 sender_recovery_before: None,
1298 transaction_lookup_full: false,
1299 transaction_lookup_distance: None,
1300 transaction_lookup_before: None,
1301 receipts_full: false,
1302 receipts_pre_merge: false,
1303 receipts_distance: None,
1304 receipts_before: None,
1305 account_history_full: false,
1306 account_history_distance: None,
1307 account_history_before: None,
1308 storage_history_full: false,
1309 storage_history_distance: None,
1310 storage_history_before: None,
1311 bodies_pre_merge: false,
1312 bodies_distance: None,
1313 receipts_log_filter: None,
1314 bodies_before: None,
1315 minimum_distance: None,
1316 },
1317 ..NodeConfig::test()
1318 };
1319 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1320 .unwrap();
1321
1322 let loaded_config = Config::from_path(config_path).unwrap();
1323
1324 assert_eq!(reth_config, loaded_config);
1325 })
1326 }
1327}