1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
69 BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
70 ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
71 StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78 StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::{
83 throttle,
84 tracing::{debug, error, info, warn},
85};
86use reth_transaction_pool::TransactionPool;
87use reth_trie_db::ChangesetCache;
88use std::{sync::Arc, thread::available_parallelism, time::Duration};
89use tokio::sync::{
90 mpsc::{unbounded_channel, UnboundedSender},
91 oneshot, watch,
92};
93
94use futures::{future::Either, stream, Stream, StreamExt};
95use reth_node_ethstats::EthStatsService;
96use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
97
98#[derive(Debug, Clone)]
117pub struct LaunchContext {
118 pub task_executor: TaskExecutor,
120 pub data_dir: ChainPath<DataDirPath>,
122}
123
124impl LaunchContext {
125 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
127 Self { task_executor, data_dir }
128 }
129
130 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
132 LaunchContextWith { inner: self, attachment }
133 }
134
135 pub fn with_loaded_toml_config<ChainSpec>(
140 self,
141 config: NodeConfig<ChainSpec>,
142 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
143 where
144 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
145 {
146 let toml_config = self.load_toml_config(&config)?;
147 Ok(self.with(WithConfigs { config, toml_config }))
148 }
149
150 pub fn load_toml_config<ChainSpec>(
155 &self,
156 config: &NodeConfig<ChainSpec>,
157 ) -> eyre::Result<reth_config::Config>
158 where
159 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
160 {
161 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
162
163 let mut toml_config = reth_config::Config::from_path(&config_path)
164 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
165
166 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
167
168 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
169
170 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
172
173 toml_config.static_files =
175 config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
176
177 Ok(toml_config)
178 }
179
180 fn save_pruning_config<ChainSpec>(
183 reth_config: &mut reth_config::Config,
184 config: &NodeConfig<ChainSpec>,
185 config_path: impl AsRef<std::path::Path>,
186 ) -> eyre::Result<()>
187 where
188 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
189 {
190 let mut should_save = reth_config.prune.segments.migrate();
191
192 if let Some(prune_config) = config.prune_config() {
193 if reth_config.prune != prune_config {
194 reth_config.set_prune_config(prune_config);
195 should_save = true;
196 }
197 } else if !reth_config.prune.is_default() {
198 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
199 }
200
201 if should_save {
202 info!(target: "reth::cli", "Saving prune config to toml file");
203 reth_config.save(config_path.as_ref())?;
204 }
205
206 Ok(())
207 }
208
209 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
211 self.configure_globals(reserved_cpu_cores);
212 self
213 }
214
215 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
222 match fdlimit::raise_fd_limit() {
225 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
226 debug!(from, to, "Raised file descriptor limit");
227 }
228 Ok(fdlimit::Outcome::Unsupported) => {}
229 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
230 }
231
232 let num_threads = available_parallelism()
236 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
237 if let Err(err) = ThreadPoolBuilder::new()
238 .num_threads(num_threads)
239 .thread_name(|i| format!("reth-rayon-{i}"))
240 .build_global()
241 {
242 warn!(%err, "Failed to build global thread pool")
243 }
244 }
245}
246
247#[derive(Debug, Clone)]
258pub struct LaunchContextWith<T> {
259 pub inner: LaunchContext,
261 pub attachment: T,
263}
264
265impl<T> LaunchContextWith<T> {
266 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
271 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
272 }
273
274 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
276 &self.inner.data_dir
277 }
278
279 pub const fn task_executor(&self) -> &TaskExecutor {
281 &self.inner.task_executor
282 }
283
284 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
286 LaunchContextWith {
287 inner: self.inner,
288 attachment: Attached::new(self.attachment, attachment),
289 }
290 }
291
292 pub fn inspect<F>(self, f: F) -> Self
295 where
296 F: FnOnce(&Self),
297 {
298 f(&self);
299 self
300 }
301}
302
303impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
304 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
306 if !self.attachment.config.network.trusted_peers.is_empty() {
307 info!(target: "reth::cli", "Adding trusted nodes");
308
309 self.attachment
310 .toml_config
311 .peers
312 .trusted_nodes
313 .extend(self.attachment.config.network.trusted_peers.clone());
314 }
315 Ok(self)
316 }
317}
318
319impl<L, R> LaunchContextWith<Attached<L, R>> {
320 pub const fn left(&self) -> &L {
322 &self.attachment.left
323 }
324
325 pub const fn right(&self) -> &R {
327 &self.attachment.right
328 }
329
330 pub const fn left_mut(&mut self) -> &mut L {
332 &mut self.attachment.left
333 }
334
335 pub const fn right_mut(&mut self) -> &mut R {
337 &mut self.attachment.right
338 }
339}
340impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
341 pub fn with_adjusted_configs(self) -> Self {
347 self.ensure_etl_datadir().with_adjusted_instance_ports()
348 }
349
350 pub fn ensure_etl_datadir(mut self) -> Self {
352 if self.toml_config_mut().stages.etl.dir.is_none() {
353 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
354 if etl_path.exists() {
355 if let Err(err) = fs::remove_dir_all(&etl_path) {
357 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
358 }
359 }
360 self.toml_config_mut().stages.etl.dir = Some(etl_path);
361 }
362
363 self
364 }
365
366 pub fn with_adjusted_instance_ports(mut self) -> Self {
368 self.node_config_mut().adjust_instance_ports();
369 self
370 }
371
372 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
374 self.attachment.left()
375 }
376
377 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
379 &self.left().config
380 }
381
382 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
384 &mut self.left_mut().config
385 }
386
387 pub const fn toml_config(&self) -> &reth_config::Config {
389 &self.left().toml_config
390 }
391
392 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
394 &mut self.left_mut().toml_config
395 }
396
397 pub fn chain_spec(&self) -> Arc<ChainSpec> {
399 self.node_config().chain.clone()
400 }
401
402 pub fn genesis_hash(&self) -> B256 {
404 self.node_config().chain.genesis_hash()
405 }
406
407 pub fn chain_id(&self) -> Chain {
409 self.node_config().chain.chain()
410 }
411
412 pub const fn is_dev(&self) -> bool {
414 self.node_config().dev.dev
415 }
416
417 pub fn prune_config(&self) -> PruneConfig
421 where
422 ChainSpec: reth_chainspec::EthereumHardforks,
423 {
424 let Some(mut node_prune_config) = self.node_config().prune_config() else {
425 return self.toml_config().prune.clone();
427 };
428
429 node_prune_config.merge(self.toml_config().prune.clone());
431 node_prune_config
432 }
433
434 pub fn prune_modes(&self) -> PruneModes
436 where
437 ChainSpec: reth_chainspec::EthereumHardforks,
438 {
439 self.prune_config().segments
440 }
441
442 pub fn pruner_builder(&self) -> PrunerBuilder
444 where
445 ChainSpec: reth_chainspec::EthereumHardforks,
446 {
447 PrunerBuilder::new(self.prune_config())
448 }
449
450 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
452 let default_jwt_path = self.data_dir().jwt();
453 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
454 Ok(secret)
455 }
456
457 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
459 where
460 Pool: TransactionPool + Unpin,
461 {
462 self.node_config().dev_mining_mode(pool)
463 }
464}
465
466impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
467where
468 DB: Database + Clone + 'static,
469 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
470{
471 pub async fn create_provider_factory<N, Evm>(
475 &self,
476 changeset_cache: ChangesetCache,
477 ) -> eyre::Result<ProviderFactory<N>>
478 where
479 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
480 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
481 {
482 let static_files_config = &self.toml_config().static_files;
484 static_files_config.validate()?;
485
486 let static_file_provider =
488 StaticFileProviderBuilder::read_write(self.data_dir().static_files())
489 .with_metrics()
490 .with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
491 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
492 .build()?;
493
494 let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
496 .with_default_tables()
497 .with_metrics()
498 .with_statistics()
499 .build()?;
500
501 let factory = ProviderFactory::new(
502 self.right().clone(),
503 self.chain_spec(),
504 static_file_provider,
505 rocksdb_provider,
506 )?
507 .with_prune_modes(self.prune_modes())
508 .with_changeset_cache(changeset_cache);
509
510 let provider_ro = factory.database_provider_ro()?;
521
522 factory.static_file_provider().check_file_consistency(&provider_ro)?;
524
525 let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?;
527
528 let static_file_unwind = factory
530 .static_file_provider()
531 .check_consistency(&provider_ro)?
532 .map(|target| match target {
533 PipelineTarget::Unwind(block) => block,
534 PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
535 });
536
537 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
539
540 if let Some(unwind_block) = unwind_target {
541 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
544 (Some(_), Some(_)) => "RocksDB and static file",
545 (Some(_), None) => "RocksDB",
546 (None, Some(_)) => "static file",
547 (None, None) => unreachable!(),
548 };
549 assert_ne!(
550 unwind_block, 0,
551 "A {} inconsistency was found that would trigger an unwind to block 0",
552 inconsistency_source
553 );
554
555 let unwind_target = PipelineTarget::Unwind(unwind_block);
556
557 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
558
559 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
560
561 let pipeline = PipelineBuilder::default()
563 .add_stages(DefaultStages::new(
564 factory.clone(),
565 tip_rx,
566 Arc::new(NoopConsensus::default()),
567 NoopHeaderDownloader::default(),
568 NoopBodiesDownloader::default(),
569 NoopEvmConfig::<Evm>::default(),
570 self.toml_config().stages.clone(),
571 self.prune_modes(),
572 None,
573 ))
574 .build(
575 factory.clone(),
576 StaticFileProducer::new(factory.clone(), self.prune_modes()),
577 );
578
579 let (tx, rx) = oneshot::channel();
581
582 self.task_executor().spawn_critical_blocking(
584 "pipeline task",
585 Box::pin(async move {
586 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
587 let _ = tx.send(result);
588 }),
589 );
590 rx.await?.inspect_err(|err| {
591 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
592 })?;
593 }
594
595 Ok(factory)
596 }
597
598 pub async fn with_provider_factory<N, Evm>(
600 self,
601 changeset_cache: ChangesetCache,
602 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
603 where
604 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
605 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
606 {
607 let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
608 let ctx = LaunchContextWith {
609 inner: self.inner,
610 attachment: self.attachment.map_right(|_| factory),
611 };
612
613 Ok(ctx)
614 }
615}
616
617impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
618where
619 T: ProviderNodeTypes,
620{
621 pub const fn database(&self) -> &T::DB {
623 self.right().db_ref()
624 }
625
626 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
628 self.right()
629 }
630
631 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
633 self.right().static_file_provider()
634 }
635
636 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
640 self.start_prometheus_endpoint().await?;
641 Ok(self)
642 }
643
644 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
646 install_prometheus_recorder().spawn_upkeep();
648
649 let listen_addr = self.node_config().metrics.prometheus;
650 if let Some(addr) = listen_addr {
651 let config = MetricServerConfig::new(
652 addr,
653 VersionInfo {
654 version: version_metadata().cargo_pkg_version.as_ref(),
655 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
656 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
657 git_sha: version_metadata().vergen_git_sha.as_ref(),
658 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
659 build_profile: version_metadata().build_profile_name.as_ref(),
660 },
661 ChainSpecInfo { name: self.chain_id().to_string() },
662 self.task_executor().clone(),
663 metrics_hooks(self.provider_factory()),
664 self.data_dir().pprof_dumps(),
665 )
666 .with_push_gateway(
667 self.node_config().metrics.push_gateway_url.clone(),
668 self.node_config().metrics.push_gateway_interval,
669 );
670
671 MetricServer::new(config).serve().await?;
672 }
673
674 Ok(())
675 }
676
677 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
679 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())?;
680 Ok(self)
681 }
682
683 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
685 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())
686 }
687
688 pub fn with_metrics_task(
694 self,
695 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
696 let (metrics_sender, metrics_receiver) = unbounded_channel();
697
698 let with_metrics =
699 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
700
701 debug!(target: "reth::cli", "Spawning stages metrics listener task");
702 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
703 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
704
705 LaunchContextWith {
706 inner: self.inner,
707 attachment: self.attachment.map_right(|_| with_metrics),
708 }
709 }
710}
711
712impl<N, DB>
713 LaunchContextWith<
714 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
715 >
716where
717 N: NodeTypes,
718 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
719{
720 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
722 &self.right().provider_factory
723 }
724
725 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
727 self.right().metrics_sender.clone()
728 }
729
730 #[expect(clippy::complexity)]
732 pub fn with_blockchain_db<T, F>(
733 self,
734 create_blockchain_provider: F,
735 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
736 where
737 T: FullNodeTypes<Types = N, DB = DB>,
738 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
739 {
740 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
741
742 let metered_providers = WithMeteredProviders {
743 db_provider_container: WithMeteredProvider {
744 provider_factory: self.provider_factory().clone(),
745 metrics_sender: self.sync_metrics_tx(),
746 },
747 blockchain_db,
748 };
749
750 let ctx = LaunchContextWith {
751 inner: self.inner,
752 attachment: self.attachment.map_right(|_| metered_providers),
753 };
754
755 Ok(ctx)
756 }
757}
758
759impl<T>
760 LaunchContextWith<
761 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
762 >
763where
764 T: FullNodeTypes<Types: NodeTypesForProvider>,
765{
766 pub const fn database(&self) -> &T::DB {
768 self.provider_factory().db_ref()
769 }
770
771 pub const fn provider_factory(
773 &self,
774 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
775 &self.right().db_provider_container.provider_factory
776 }
777
778 pub fn lookup_head(&self) -> eyre::Result<Head> {
782 self.node_config()
783 .lookup_head(self.provider_factory())
784 .wrap_err("the head block is missing")
785 }
786
787 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
789 self.right().db_provider_container.metrics_sender.clone()
790 }
791
792 pub const fn blockchain_db(&self) -> &T::Provider {
794 &self.right().blockchain_db
795 }
796
797 pub async fn with_components<CB>(
799 self,
800 components_builder: CB,
801 on_component_initialized: Box<
802 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
803 >,
804 ) -> eyre::Result<
805 LaunchContextWith<
806 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
807 >,
808 >
809 where
810 CB: NodeComponentsBuilder<T>,
811 {
812 let head = self.lookup_head()?;
814
815 let builder_ctx = BuilderContext::new(
816 head,
817 self.blockchain_db().clone(),
818 self.task_executor().clone(),
819 self.configs().clone(),
820 );
821
822 debug!(target: "reth::cli", "creating components");
823 let components = components_builder.build_components(&builder_ctx).await?;
824
825 let blockchain_db = self.blockchain_db().clone();
826
827 let node_adapter = NodeAdapter {
828 components,
829 task_executor: self.task_executor().clone(),
830 provider: blockchain_db,
831 };
832
833 debug!(target: "reth::cli", "calling on_component_initialized hook");
834 on_component_initialized.on_event(node_adapter.clone())?;
835
836 let components_container = WithComponents {
837 db_provider_container: WithMeteredProvider {
838 provider_factory: self.provider_factory().clone(),
839 metrics_sender: self.sync_metrics_tx(),
840 },
841 node_adapter,
842 head,
843 };
844
845 let ctx = LaunchContextWith {
846 inner: self.inner,
847 attachment: self.attachment.map_right(|_| components_container),
848 };
849
850 Ok(ctx)
851 }
852}
853
854impl<T, CB>
855 LaunchContextWith<
856 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
857 >
858where
859 T: FullNodeTypes<Types: NodeTypesForProvider>,
860 CB: NodeComponentsBuilder<T>,
861{
862 pub const fn provider_factory(
864 &self,
865 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
866 &self.right().db_provider_container.provider_factory
867 }
868
869 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
872 where
873 C: HeadersClient<Header: BlockHeader>,
874 {
875 self.node_config().max_block(client, self.provider_factory().clone()).await
876 }
877
878 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
880 self.provider_factory().static_file_provider()
881 }
882
883 pub fn static_file_producer(
885 &self,
886 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
887 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
888 }
889
890 pub const fn head(&self) -> Head {
892 self.right().head
893 }
894
895 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
897 &self.right().node_adapter
898 }
899
900 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
902 &mut self.right_mut().node_adapter
903 }
904
905 pub const fn blockchain_db(&self) -> &T::Provider {
907 &self.node_adapter().provider
908 }
909
910 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
916 let mut initial_target = self.node_config().debug.tip;
917
918 if initial_target.is_none() {
919 initial_target = self.check_pipeline_consistency()?;
920 }
921
922 Ok(initial_target)
923 }
924
925 pub const fn terminate_after_initial_backfill(&self) -> bool {
931 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
932 }
933
934 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
939 if self.chain_spec().is_optimism() &&
940 !self.is_dev() &&
941 self.chain_id() == Chain::optimism_mainnet()
942 {
943 let latest = self.blockchain_db().last_block_number()?;
944 if latest < 105235063 {
946 error!(
947 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
948 );
949 return Err(ProviderError::BestBlockNotFound);
950 }
951 }
952
953 Ok(())
954 }
955
956 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
968 let era_enabled = self.era_import_source().is_some();
970 let mut all_stages =
971 StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
972
973 let first_stage = all_stages.next().expect("there must be at least one stage");
975
976 let first_stage_checkpoint = self
979 .blockchain_db()
980 .get_stage_checkpoint(first_stage)?
981 .unwrap_or_default()
982 .block_number;
983
984 for stage_id in all_stages {
986 let stage_checkpoint = self
987 .blockchain_db()
988 .get_stage_checkpoint(stage_id)?
989 .unwrap_or_default()
990 .block_number;
991
992 debug!(
995 target: "consensus::engine",
996 first_stage_id = %first_stage,
997 first_stage_checkpoint,
998 stage_id = %stage_id,
999 stage_checkpoint = stage_checkpoint,
1000 "Checking stage against first stage",
1001 );
1002 if stage_checkpoint < first_stage_checkpoint {
1003 debug!(
1004 target: "consensus::engine",
1005 first_stage_id = %first_stage,
1006 first_stage_checkpoint,
1007 inconsistent_stage_id = %stage_id,
1008 inconsistent_stage_checkpoint = stage_checkpoint,
1009 "Pipeline sync progress is inconsistent"
1010 );
1011 return self.blockchain_db().block_hash(first_stage_checkpoint);
1012 }
1013 }
1014
1015 self.ensure_chain_specific_db_checks()?;
1016
1017 Ok(None)
1018 }
1019
1020 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
1022 self.right().db_provider_container.metrics_sender.clone()
1023 }
1024
1025 pub const fn components(&self) -> &CB::Components {
1027 &self.node_adapter().components
1028 }
1029
1030 #[allow(clippy::type_complexity)]
1032 pub async fn launch_exex(
1033 &self,
1034 installed_exex: Vec<(
1035 String,
1036 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1037 )>,
1038 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1039 self.exex_launcher(installed_exex).launch().await
1040 }
1041
1042 #[allow(clippy::type_complexity)]
1054 pub fn exex_launcher(
1055 &self,
1056 installed_exex: Vec<(
1057 String,
1058 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1059 )>,
1060 ) -> ExExLauncher<NodeAdapter<T, CB::Components>> {
1061 ExExLauncher::new(
1062 self.head(),
1063 self.node_adapter().clone(),
1064 installed_exex,
1065 self.configs().clone(),
1066 )
1067 }
1068
1069 pub fn era_import_source(&self) -> Option<EraImportSource> {
1073 let node_config = self.node_config();
1074 if !node_config.era.enabled {
1075 return None;
1076 }
1077
1078 EraImportSource::maybe_new(
1079 node_config.era.source.path.clone(),
1080 node_config.era.source.url.clone(),
1081 || node_config.chain.chain().kind().default_era_host(),
1082 || node_config.datadir().data_dir().join("era").into(),
1083 )
1084 }
1085
1086 pub fn consensus_layer_events(
1094 &self,
1095 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1096 where
1097 T::Provider: reth_provider::CanonChainTracker,
1098 {
1099 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1100 Either::Left(
1101 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1102 .map(Into::into),
1103 )
1104 } else {
1105 Either::Right(stream::empty())
1106 }
1107 }
1108
1109 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1111 where
1112 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1113 + Send
1114 + Unpin
1115 + 'static,
1116 {
1117 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1118
1119 let network = self.components().network().clone();
1120 let pool = self.components().pool().clone();
1121 let provider = self.node_adapter().provider.clone();
1122
1123 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1124
1125 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1126
1127 let ethstats_for_events = ethstats.clone();
1129 let task_executor = self.task_executor().clone();
1130 task_executor.spawn(Box::pin(async move {
1131 while let Some(event) = engine_events.next().await {
1132 use reth_engine_primitives::ConsensusEngineEvent;
1133 match event {
1134 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1135 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1136 let block_hash = executed.recovered_block.num_hash().hash;
1137 let block_number = executed.recovered_block.num_hash().number;
1138 if let Err(e) = ethstats_for_events
1139 .report_new_payload(block_hash, block_number, duration)
1140 .await
1141 {
1142 debug!(
1143 target: "ethstats",
1144 "Failed to report new payload: {}", e
1145 );
1146 }
1147 }
1148 _ => {
1149 }
1151 }
1152 }
1153 }));
1154
1155 task_executor.spawn(Box::pin(async move { ethstats.run().await }));
1157
1158 Ok(())
1159 }
1160}
1161
1162#[derive(Clone, Copy, Debug)]
1168pub struct Attached<L, R> {
1169 left: L,
1170 right: R,
1171}
1172
1173impl<L, R> Attached<L, R> {
1174 pub const fn new(left: L, right: R) -> Self {
1176 Self { left, right }
1177 }
1178
1179 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1181 where
1182 F: FnOnce(L) -> T,
1183 {
1184 Attached::new(f(self.left), self.right)
1185 }
1186
1187 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1189 where
1190 F: FnOnce(R) -> T,
1191 {
1192 Attached::new(self.left, f(self.right))
1193 }
1194
1195 pub const fn left(&self) -> &L {
1197 &self.left
1198 }
1199
1200 pub const fn right(&self) -> &R {
1202 &self.right
1203 }
1204
1205 pub const fn left_mut(&mut self) -> &mut L {
1207 &mut self.left
1208 }
1209
1210 pub const fn right_mut(&mut self) -> &mut R {
1212 &mut self.right
1213 }
1214}
1215
1216#[derive(Debug)]
1219pub struct WithConfigs<ChainSpec> {
1220 pub config: NodeConfig<ChainSpec>,
1222 pub toml_config: reth_config::Config,
1224}
1225
1226impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1227 fn clone(&self) -> Self {
1228 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1229 }
1230}
1231
1232#[derive(Debug, Clone)]
1235pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1236 provider_factory: ProviderFactory<N>,
1237 metrics_sender: UnboundedSender<MetricEvent>,
1238}
1239
1240#[expect(missing_debug_implementations)]
1243pub struct WithMeteredProviders<T>
1244where
1245 T: FullNodeTypes,
1246{
1247 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1248 blockchain_db: T::Provider,
1249}
1250
1251#[expect(missing_debug_implementations)]
1253pub struct WithComponents<T, CB>
1254where
1255 T: FullNodeTypes,
1256 CB: NodeComponentsBuilder<T>,
1257{
1258 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1259 node_adapter: NodeAdapter<T, CB::Components>,
1260 head: Head,
1261}
1262
1263pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1265 Hooks::builder()
1266 .with_hook({
1267 let db = provider_factory.db_ref().clone();
1268 move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1269 })
1270 .with_hook({
1271 let sfp = provider_factory.static_file_provider();
1272 move || {
1273 throttle!(Duration::from_secs(5 * 60), || {
1274 if let Err(error) = sfp.report_metrics() {
1275 error!(%error, "Failed to report metrics from static file provider");
1276 }
1277 })
1278 }
1279 })
1280 .with_hook({
1281 let rocksdb = provider_factory.rocksdb_provider();
1282 move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1283 })
1284 .build()
1285}
1286
1287#[cfg(test)]
1288mod tests {
1289 use super::{LaunchContext, NodeConfig};
1290 use reth_config::Config;
1291 use reth_node_core::args::PruningArgs;
1292
1293 const EXTENSION: &str = "toml";
1294
1295 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1296 let temp_dir = tempfile::tempdir().unwrap();
1297 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1298 proc(&config_path);
1299 temp_dir.close().unwrap()
1300 }
1301
1302 #[test]
1303 fn test_save_prune_config() {
1304 with_tempdir("prune-store-test", |config_path| {
1305 let mut reth_config = Config::default();
1306 let node_config = NodeConfig {
1307 pruning: PruningArgs {
1308 full: true,
1309 minimal: false,
1310 block_interval: None,
1311 sender_recovery_full: false,
1312 sender_recovery_distance: None,
1313 sender_recovery_before: None,
1314 transaction_lookup_full: false,
1315 transaction_lookup_distance: None,
1316 transaction_lookup_before: None,
1317 receipts_full: false,
1318 receipts_pre_merge: false,
1319 receipts_distance: None,
1320 receipts_before: None,
1321 account_history_full: false,
1322 account_history_distance: None,
1323 account_history_before: None,
1324 storage_history_full: false,
1325 storage_history_distance: None,
1326 storage_history_before: None,
1327 bodies_pre_merge: false,
1328 bodies_distance: None,
1329 receipts_log_filter: None,
1330 bodies_before: None,
1331 },
1332 ..NodeConfig::test()
1333 };
1334 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1335 .unwrap();
1336
1337 let loaded_config = Config::from_path(config_path).unwrap();
1338
1339 assert_eq!(reth_config, loaded_config);
1340 })
1341 }
1342}