1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
69 BlockHashReader, BlockNumReader, DatabaseProviderFactory, ProviderError, ProviderFactory,
70 ProviderResult, RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
71 StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78 StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::tracing::{debug, error, info, warn};
83use reth_transaction_pool::TransactionPool;
84use std::{sync::Arc, thread::available_parallelism};
85use tokio::sync::{
86 mpsc::{unbounded_channel, UnboundedSender},
87 oneshot, watch,
88};
89
90use futures::{future::Either, stream, Stream, StreamExt};
91use reth_node_ethstats::EthStatsService;
92use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
93
94#[derive(Debug, Clone)]
113pub struct LaunchContext {
114 pub task_executor: TaskExecutor,
116 pub data_dir: ChainPath<DataDirPath>,
118}
119
120impl LaunchContext {
121 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
123 Self { task_executor, data_dir }
124 }
125
126 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
128 LaunchContextWith { inner: self, attachment }
129 }
130
131 pub fn with_loaded_toml_config<ChainSpec>(
136 self,
137 config: NodeConfig<ChainSpec>,
138 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
139 where
140 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
141 {
142 let toml_config = self.load_toml_config(&config)?;
143 Ok(self.with(WithConfigs { config, toml_config }))
144 }
145
146 pub fn load_toml_config<ChainSpec>(
151 &self,
152 config: &NodeConfig<ChainSpec>,
153 ) -> eyre::Result<reth_config::Config>
154 where
155 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
156 {
157 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
158
159 let mut toml_config = reth_config::Config::from_path(&config_path)
160 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
161
162 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
163
164 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
165
166 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
168
169 toml_config.static_files = config.static_files.merge_with_config(toml_config.static_files);
171
172 Ok(toml_config)
173 }
174
175 fn save_pruning_config<ChainSpec>(
178 reth_config: &mut reth_config::Config,
179 config: &NodeConfig<ChainSpec>,
180 config_path: impl AsRef<std::path::Path>,
181 ) -> eyre::Result<()>
182 where
183 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
184 {
185 let mut should_save = reth_config.prune.segments.migrate();
186
187 if let Some(prune_config) = config.prune_config() {
188 if reth_config.prune != prune_config {
189 reth_config.set_prune_config(prune_config);
190 should_save = true;
191 }
192 } else if !reth_config.prune.is_default() {
193 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
194 }
195
196 if should_save {
197 info!(target: "reth::cli", "Saving prune config to toml file");
198 reth_config.save(config_path.as_ref())?;
199 }
200
201 Ok(())
202 }
203
204 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
206 self.configure_globals(reserved_cpu_cores);
207 self
208 }
209
210 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
217 match fdlimit::raise_fd_limit() {
220 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
221 debug!(from, to, "Raised file descriptor limit");
222 }
223 Ok(fdlimit::Outcome::Unsupported) => {}
224 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
225 }
226
227 let num_threads = available_parallelism()
231 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
232 if let Err(err) = ThreadPoolBuilder::new()
233 .num_threads(num_threads)
234 .thread_name(|i| format!("reth-rayon-{i}"))
235 .build_global()
236 {
237 warn!(%err, "Failed to build global thread pool")
238 }
239 }
240}
241
242#[derive(Debug, Clone)]
253pub struct LaunchContextWith<T> {
254 pub inner: LaunchContext,
256 pub attachment: T,
258}
259
260impl<T> LaunchContextWith<T> {
261 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
266 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
267 }
268
269 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
271 &self.inner.data_dir
272 }
273
274 pub const fn task_executor(&self) -> &TaskExecutor {
276 &self.inner.task_executor
277 }
278
279 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
281 LaunchContextWith {
282 inner: self.inner,
283 attachment: Attached::new(self.attachment, attachment),
284 }
285 }
286
287 pub fn inspect<F>(self, f: F) -> Self
290 where
291 F: FnOnce(&Self),
292 {
293 f(&self);
294 self
295 }
296}
297
298impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
299 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
301 if !self.attachment.config.network.trusted_peers.is_empty() {
302 info!(target: "reth::cli", "Adding trusted nodes");
303
304 self.attachment
305 .toml_config
306 .peers
307 .trusted_nodes
308 .extend(self.attachment.config.network.trusted_peers.clone());
309 }
310 Ok(self)
311 }
312}
313
314impl<L, R> LaunchContextWith<Attached<L, R>> {
315 pub const fn left(&self) -> &L {
317 &self.attachment.left
318 }
319
320 pub const fn right(&self) -> &R {
322 &self.attachment.right
323 }
324
325 pub const fn left_mut(&mut self) -> &mut L {
327 &mut self.attachment.left
328 }
329
330 pub const fn right_mut(&mut self) -> &mut R {
332 &mut self.attachment.right
333 }
334}
335impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
336 pub fn with_adjusted_configs(self) -> Self {
342 self.ensure_etl_datadir().with_adjusted_instance_ports()
343 }
344
345 pub fn ensure_etl_datadir(mut self) -> Self {
347 if self.toml_config_mut().stages.etl.dir.is_none() {
348 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
349 if etl_path.exists() {
350 if let Err(err) = fs::remove_dir_all(&etl_path) {
352 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
353 }
354 }
355 self.toml_config_mut().stages.etl.dir = Some(etl_path);
356 }
357
358 self
359 }
360
361 pub fn with_adjusted_instance_ports(mut self) -> Self {
363 self.node_config_mut().adjust_instance_ports();
364 self
365 }
366
367 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
369 self.attachment.left()
370 }
371
372 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
374 &self.left().config
375 }
376
377 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
379 &mut self.left_mut().config
380 }
381
382 pub const fn toml_config(&self) -> &reth_config::Config {
384 &self.left().toml_config
385 }
386
387 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
389 &mut self.left_mut().toml_config
390 }
391
392 pub fn chain_spec(&self) -> Arc<ChainSpec> {
394 self.node_config().chain.clone()
395 }
396
397 pub fn genesis_hash(&self) -> B256 {
399 self.node_config().chain.genesis_hash()
400 }
401
402 pub fn chain_id(&self) -> Chain {
404 self.node_config().chain.chain()
405 }
406
407 pub const fn is_dev(&self) -> bool {
409 self.node_config().dev.dev
410 }
411
412 pub fn prune_config(&self) -> PruneConfig
416 where
417 ChainSpec: reth_chainspec::EthereumHardforks,
418 {
419 let Some(mut node_prune_config) = self.node_config().prune_config() else {
420 return self.toml_config().prune.clone();
422 };
423
424 node_prune_config.merge(self.toml_config().prune.clone());
426 node_prune_config
427 }
428
429 pub fn prune_modes(&self) -> PruneModes
431 where
432 ChainSpec: reth_chainspec::EthereumHardforks,
433 {
434 self.prune_config().segments
435 }
436
437 pub fn pruner_builder(&self) -> PrunerBuilder
439 where
440 ChainSpec: reth_chainspec::EthereumHardforks,
441 {
442 PrunerBuilder::new(self.prune_config())
443 }
444
445 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
447 let default_jwt_path = self.data_dir().jwt();
448 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
449 Ok(secret)
450 }
451
452 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
454 where
455 Pool: TransactionPool + Unpin,
456 {
457 self.node_config().dev_mining_mode(pool)
458 }
459}
460
461impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
462where
463 DB: Database + Clone + 'static,
464 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
465{
466 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
470 where
471 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
472 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
473 {
474 let static_files_config = &self.toml_config().static_files;
476 static_files_config.validate()?;
477
478 let static_file_provider =
480 StaticFileProviderBuilder::read_write(self.data_dir().static_files())?
481 .with_metrics()
482 .with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
483 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
484 .build()?;
485
486 let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
488 .with_default_tables()
489 .with_metrics()
490 .with_statistics()
491 .build()?;
492
493 let factory = ProviderFactory::new(
494 self.right().clone(),
495 self.chain_spec(),
496 static_file_provider,
497 rocksdb_provider,
498 )?
499 .with_prune_modes(self.prune_modes());
500
501 let provider_ro = factory.database_provider_ro()?;
512
513 factory.static_file_provider().check_file_consistency(&provider_ro)?;
515
516 let rocksdb_unwind = factory.rocksdb_provider().check_consistency(&provider_ro)?;
518
519 let static_file_unwind = factory
521 .static_file_provider()
522 .check_consistency(&provider_ro)?
523 .map(|target| match target {
524 PipelineTarget::Unwind(block) => block,
525 PipelineTarget::Sync(_) => unreachable!("check_consistency returns Unwind"),
526 });
527
528 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
530
531 if let Some(unwind_block) = unwind_target {
532 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
535 (Some(_), Some(_)) => "RocksDB and static file",
536 (Some(_), None) => "RocksDB",
537 (None, Some(_)) => "static file",
538 (None, None) => unreachable!(),
539 };
540 assert_ne!(
541 unwind_block, 0,
542 "A {} inconsistency was found that would trigger an unwind to block 0",
543 inconsistency_source
544 );
545
546 let unwind_target = PipelineTarget::Unwind(unwind_block);
547
548 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
549
550 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
551
552 let pipeline = PipelineBuilder::default()
554 .add_stages(DefaultStages::new(
555 factory.clone(),
556 tip_rx,
557 Arc::new(NoopConsensus::default()),
558 NoopHeaderDownloader::default(),
559 NoopBodiesDownloader::default(),
560 NoopEvmConfig::<Evm>::default(),
561 self.toml_config().stages.clone(),
562 self.prune_modes(),
563 None,
564 ))
565 .build(
566 factory.clone(),
567 StaticFileProducer::new(factory.clone(), self.prune_modes()),
568 );
569
570 let (tx, rx) = oneshot::channel();
572
573 self.task_executor().spawn_critical_blocking(
575 "pipeline task",
576 Box::pin(async move {
577 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
578 let _ = tx.send(result);
579 }),
580 );
581 rx.await?.inspect_err(|err| {
582 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
583 })?;
584 }
585
586 Ok(factory)
587 }
588
589 pub async fn with_provider_factory<N, Evm>(
591 self,
592 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
593 where
594 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
595 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
596 {
597 let factory = self.create_provider_factory::<N, Evm>().await?;
598 let ctx = LaunchContextWith {
599 inner: self.inner,
600 attachment: self.attachment.map_right(|_| factory),
601 };
602
603 Ok(ctx)
604 }
605}
606
607impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
608where
609 T: ProviderNodeTypes,
610{
611 pub const fn database(&self) -> &T::DB {
613 self.right().db_ref()
614 }
615
616 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
618 self.right()
619 }
620
621 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
623 self.right().static_file_provider()
624 }
625
626 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
630 self.start_prometheus_endpoint().await?;
631 Ok(self)
632 }
633
634 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
636 install_prometheus_recorder().spawn_upkeep();
638
639 let listen_addr = self.node_config().metrics.prometheus;
640 if let Some(addr) = listen_addr {
641 let config = MetricServerConfig::new(
642 addr,
643 VersionInfo {
644 version: version_metadata().cargo_pkg_version.as_ref(),
645 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
646 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
647 git_sha: version_metadata().vergen_git_sha.as_ref(),
648 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
649 build_profile: version_metadata().build_profile_name.as_ref(),
650 },
651 ChainSpecInfo { name: self.chain_id().to_string() },
652 self.task_executor().clone(),
653 Hooks::builder()
654 .with_hook({
655 let db = self.database().clone();
656 move || db.report_metrics()
657 })
658 .with_hook({
659 let sfp = self.static_file_provider();
660 move || {
661 if let Err(error) = sfp.report_metrics() {
662 error!(%error, "Failed to report metrics for the static file provider");
663 }
664 }
665 })
666 .build(),
667 ).with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
668
669 MetricServer::new(config).serve().await?;
670 }
671
672 Ok(())
673 }
674
675 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
677 init_genesis_with_settings(
678 self.provider_factory(),
679 self.node_config().static_files.to_settings(),
680 )?;
681 Ok(self)
682 }
683
684 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
686 init_genesis_with_settings(
687 self.provider_factory(),
688 self.node_config().static_files.to_settings(),
689 )
690 }
691
692 pub fn with_metrics_task(
698 self,
699 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
700 let (metrics_sender, metrics_receiver) = unbounded_channel();
701
702 let with_metrics =
703 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
704
705 debug!(target: "reth::cli", "Spawning stages metrics listener task");
706 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
707 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
708
709 LaunchContextWith {
710 inner: self.inner,
711 attachment: self.attachment.map_right(|_| with_metrics),
712 }
713 }
714}
715
716impl<N, DB>
717 LaunchContextWith<
718 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
719 >
720where
721 N: NodeTypes,
722 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
723{
724 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
726 &self.right().provider_factory
727 }
728
729 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
731 self.right().metrics_sender.clone()
732 }
733
734 #[expect(clippy::complexity)]
736 pub fn with_blockchain_db<T, F>(
737 self,
738 create_blockchain_provider: F,
739 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
740 where
741 T: FullNodeTypes<Types = N, DB = DB>,
742 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
743 {
744 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
745
746 let metered_providers = WithMeteredProviders {
747 db_provider_container: WithMeteredProvider {
748 provider_factory: self.provider_factory().clone(),
749 metrics_sender: self.sync_metrics_tx(),
750 },
751 blockchain_db,
752 };
753
754 let ctx = LaunchContextWith {
755 inner: self.inner,
756 attachment: self.attachment.map_right(|_| metered_providers),
757 };
758
759 Ok(ctx)
760 }
761}
762
763impl<T>
764 LaunchContextWith<
765 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
766 >
767where
768 T: FullNodeTypes<Types: NodeTypesForProvider>,
769{
770 pub const fn database(&self) -> &T::DB {
772 self.provider_factory().db_ref()
773 }
774
775 pub const fn provider_factory(
777 &self,
778 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
779 &self.right().db_provider_container.provider_factory
780 }
781
782 pub fn lookup_head(&self) -> eyre::Result<Head> {
786 self.node_config()
787 .lookup_head(self.provider_factory())
788 .wrap_err("the head block is missing")
789 }
790
791 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
793 self.right().db_provider_container.metrics_sender.clone()
794 }
795
796 pub const fn blockchain_db(&self) -> &T::Provider {
798 &self.right().blockchain_db
799 }
800
801 pub async fn with_components<CB>(
803 self,
804 components_builder: CB,
805 on_component_initialized: Box<
806 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
807 >,
808 ) -> eyre::Result<
809 LaunchContextWith<
810 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
811 >,
812 >
813 where
814 CB: NodeComponentsBuilder<T>,
815 {
816 let head = self.lookup_head()?;
818
819 let builder_ctx = BuilderContext::new(
820 head,
821 self.blockchain_db().clone(),
822 self.task_executor().clone(),
823 self.configs().clone(),
824 );
825
826 debug!(target: "reth::cli", "creating components");
827 let components = components_builder.build_components(&builder_ctx).await?;
828
829 let blockchain_db = self.blockchain_db().clone();
830
831 let node_adapter = NodeAdapter {
832 components,
833 task_executor: self.task_executor().clone(),
834 provider: blockchain_db,
835 };
836
837 debug!(target: "reth::cli", "calling on_component_initialized hook");
838 on_component_initialized.on_event(node_adapter.clone())?;
839
840 let components_container = WithComponents {
841 db_provider_container: WithMeteredProvider {
842 provider_factory: self.provider_factory().clone(),
843 metrics_sender: self.sync_metrics_tx(),
844 },
845 node_adapter,
846 head,
847 };
848
849 let ctx = LaunchContextWith {
850 inner: self.inner,
851 attachment: self.attachment.map_right(|_| components_container),
852 };
853
854 Ok(ctx)
855 }
856}
857
858impl<T, CB>
859 LaunchContextWith<
860 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
861 >
862where
863 T: FullNodeTypes<Types: NodeTypesForProvider>,
864 CB: NodeComponentsBuilder<T>,
865{
866 pub const fn provider_factory(
868 &self,
869 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
870 &self.right().db_provider_container.provider_factory
871 }
872
873 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
876 where
877 C: HeadersClient<Header: BlockHeader>,
878 {
879 self.node_config().max_block(client, self.provider_factory().clone()).await
880 }
881
882 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
884 self.provider_factory().static_file_provider()
885 }
886
887 pub fn static_file_producer(
889 &self,
890 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
891 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
892 }
893
894 pub const fn head(&self) -> Head {
896 self.right().head
897 }
898
899 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
901 &self.right().node_adapter
902 }
903
904 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
906 &mut self.right_mut().node_adapter
907 }
908
909 pub const fn blockchain_db(&self) -> &T::Provider {
911 &self.node_adapter().provider
912 }
913
914 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
920 let mut initial_target = self.node_config().debug.tip;
921
922 if initial_target.is_none() {
923 initial_target = self.check_pipeline_consistency()?;
924 }
925
926 Ok(initial_target)
927 }
928
929 pub const fn terminate_after_initial_backfill(&self) -> bool {
935 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
936 }
937
938 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
943 if self.chain_spec().is_optimism() &&
944 !self.is_dev() &&
945 self.chain_id() == Chain::optimism_mainnet()
946 {
947 let latest = self.blockchain_db().last_block_number()?;
948 if latest < 105235063 {
950 error!(
951 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
952 );
953 return Err(ProviderError::BestBlockNotFound)
954 }
955 }
956
957 Ok(())
958 }
959
960 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
972 let era_enabled = self.era_import_source().is_some();
974 let mut all_stages =
975 StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
976
977 let first_stage = all_stages.next().expect("there must be at least one stage");
979
980 let first_stage_checkpoint = self
983 .blockchain_db()
984 .get_stage_checkpoint(first_stage)?
985 .unwrap_or_default()
986 .block_number;
987
988 for stage_id in all_stages {
990 let stage_checkpoint = self
991 .blockchain_db()
992 .get_stage_checkpoint(stage_id)?
993 .unwrap_or_default()
994 .block_number;
995
996 debug!(
999 target: "consensus::engine",
1000 first_stage_id = %first_stage,
1001 first_stage_checkpoint,
1002 stage_id = %stage_id,
1003 stage_checkpoint = stage_checkpoint,
1004 "Checking stage against first stage",
1005 );
1006 if stage_checkpoint < first_stage_checkpoint {
1007 debug!(
1008 target: "consensus::engine",
1009 first_stage_id = %first_stage,
1010 first_stage_checkpoint,
1011 inconsistent_stage_id = %stage_id,
1012 inconsistent_stage_checkpoint = stage_checkpoint,
1013 "Pipeline sync progress is inconsistent"
1014 );
1015 return self.blockchain_db().block_hash(first_stage_checkpoint);
1016 }
1017 }
1018
1019 self.ensure_chain_specific_db_checks()?;
1020
1021 Ok(None)
1022 }
1023
1024 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
1026 self.right().db_provider_container.metrics_sender.clone()
1027 }
1028
1029 pub const fn components(&self) -> &CB::Components {
1031 &self.node_adapter().components
1032 }
1033
1034 #[allow(clippy::type_complexity)]
1036 pub async fn launch_exex(
1037 &self,
1038 installed_exex: Vec<(
1039 String,
1040 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1041 )>,
1042 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1043 ExExLauncher::new(
1044 self.head(),
1045 self.node_adapter().clone(),
1046 installed_exex,
1047 self.configs().clone(),
1048 )
1049 .launch()
1050 .await
1051 }
1052
1053 pub fn era_import_source(&self) -> Option<EraImportSource> {
1057 let node_config = self.node_config();
1058 if !node_config.era.enabled {
1059 return None;
1060 }
1061
1062 EraImportSource::maybe_new(
1063 node_config.era.source.path.clone(),
1064 node_config.era.source.url.clone(),
1065 || node_config.chain.chain().kind().default_era_host(),
1066 || node_config.datadir().data_dir().join("era").into(),
1067 )
1068 }
1069
1070 pub fn consensus_layer_events(
1078 &self,
1079 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1080 where
1081 T::Provider: reth_provider::CanonChainTracker,
1082 {
1083 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1084 Either::Left(
1085 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1086 .map(Into::into),
1087 )
1088 } else {
1089 Either::Right(stream::empty())
1090 }
1091 }
1092
1093 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1095 where
1096 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1097 + Send
1098 + Unpin
1099 + 'static,
1100 {
1101 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1102
1103 let network = self.components().network().clone();
1104 let pool = self.components().pool().clone();
1105 let provider = self.node_adapter().provider.clone();
1106
1107 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1108
1109 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1110
1111 let ethstats_for_events = ethstats.clone();
1113 let task_executor = self.task_executor().clone();
1114 task_executor.spawn(Box::pin(async move {
1115 while let Some(event) = engine_events.next().await {
1116 use reth_engine_primitives::ConsensusEngineEvent;
1117 match event {
1118 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1119 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1120 let block_hash = executed.recovered_block.num_hash().hash;
1121 let block_number = executed.recovered_block.num_hash().number;
1122 if let Err(e) = ethstats_for_events
1123 .report_new_payload(block_hash, block_number, duration)
1124 .await
1125 {
1126 debug!(
1127 target: "ethstats",
1128 "Failed to report new payload: {}", e
1129 );
1130 }
1131 }
1132 _ => {
1133 }
1135 }
1136 }
1137 }));
1138
1139 task_executor.spawn(Box::pin(async move { ethstats.run().await }));
1141
1142 Ok(())
1143 }
1144}
1145
1146#[derive(Clone, Copy, Debug)]
1152pub struct Attached<L, R> {
1153 left: L,
1154 right: R,
1155}
1156
1157impl<L, R> Attached<L, R> {
1158 pub const fn new(left: L, right: R) -> Self {
1160 Self { left, right }
1161 }
1162
1163 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1165 where
1166 F: FnOnce(L) -> T,
1167 {
1168 Attached::new(f(self.left), self.right)
1169 }
1170
1171 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1173 where
1174 F: FnOnce(R) -> T,
1175 {
1176 Attached::new(self.left, f(self.right))
1177 }
1178
1179 pub const fn left(&self) -> &L {
1181 &self.left
1182 }
1183
1184 pub const fn right(&self) -> &R {
1186 &self.right
1187 }
1188
1189 pub const fn left_mut(&mut self) -> &mut L {
1191 &mut self.left
1192 }
1193
1194 pub const fn right_mut(&mut self) -> &mut R {
1196 &mut self.right
1197 }
1198}
1199
1200#[derive(Debug)]
1203pub struct WithConfigs<ChainSpec> {
1204 pub config: NodeConfig<ChainSpec>,
1206 pub toml_config: reth_config::Config,
1208}
1209
1210impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1211 fn clone(&self) -> Self {
1212 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1213 }
1214}
1215
1216#[derive(Debug, Clone)]
1219pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1220 provider_factory: ProviderFactory<N>,
1221 metrics_sender: UnboundedSender<MetricEvent>,
1222}
1223
1224#[expect(missing_debug_implementations)]
1227pub struct WithMeteredProviders<T>
1228where
1229 T: FullNodeTypes,
1230{
1231 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1232 blockchain_db: T::Provider,
1233}
1234
1235#[expect(missing_debug_implementations)]
1237pub struct WithComponents<T, CB>
1238where
1239 T: FullNodeTypes,
1240 CB: NodeComponentsBuilder<T>,
1241{
1242 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1243 node_adapter: NodeAdapter<T, CB::Components>,
1244 head: Head,
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use super::{LaunchContext, NodeConfig};
1250 use reth_config::Config;
1251 use reth_node_core::args::PruningArgs;
1252
1253 const EXTENSION: &str = "toml";
1254
1255 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1256 let temp_dir = tempfile::tempdir().unwrap();
1257 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1258 proc(&config_path);
1259 temp_dir.close().unwrap()
1260 }
1261
1262 #[test]
1263 fn test_save_prune_config() {
1264 with_tempdir("prune-store-test", |config_path| {
1265 let mut reth_config = Config::default();
1266 let node_config = NodeConfig {
1267 pruning: PruningArgs {
1268 full: true,
1269 block_interval: None,
1270 sender_recovery_full: false,
1271 sender_recovery_distance: None,
1272 sender_recovery_before: None,
1273 transaction_lookup_full: false,
1274 transaction_lookup_distance: None,
1275 transaction_lookup_before: None,
1276 receipts_full: false,
1277 receipts_pre_merge: false,
1278 receipts_distance: None,
1279 receipts_before: None,
1280 account_history_full: false,
1281 account_history_distance: None,
1282 account_history_before: None,
1283 storage_history_full: false,
1284 storage_history_distance: None,
1285 storage_history_before: None,
1286 bodies_pre_merge: false,
1287 bodies_distance: None,
1288 receipts_log_filter: None,
1289 bodies_before: None,
1290 },
1291 ..NodeConfig::test()
1292 };
1293 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1294 .unwrap();
1295
1296 let loaded_config = Config::from_path(config_path).unwrap();
1297
1298 assert_eq!(reth_config, loaded_config);
1299 })
1300 }
1301}