1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
69 BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70 StageCheckpointReader, StaticFileProviderFactory,
71};
72use reth_prune::{PruneModes, PrunerBuilder};
73use reth_rpc_builder::config::RethRpcServerConfig;
74use reth_rpc_layer::JwtSecret;
75use reth_stages::{
76 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
77 StageId,
78};
79use reth_static_file::StaticFileProducer;
80use reth_tasks::TaskExecutor;
81use reth_tracing::tracing::{debug, error, info, warn};
82use reth_transaction_pool::TransactionPool;
83use std::{sync::Arc, thread::available_parallelism};
84use tokio::sync::{
85 mpsc::{unbounded_channel, UnboundedSender},
86 oneshot, watch,
87};
88
89use futures::{future::Either, stream, Stream, StreamExt};
90use reth_node_ethstats::EthStatsService;
91use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
92
93#[derive(Debug, Clone)]
112pub struct LaunchContext {
113 pub task_executor: TaskExecutor,
115 pub data_dir: ChainPath<DataDirPath>,
117}
118
119impl LaunchContext {
120 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
122 Self { task_executor, data_dir }
123 }
124
125 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
127 LaunchContextWith { inner: self, attachment }
128 }
129
130 pub fn with_loaded_toml_config<ChainSpec>(
135 self,
136 config: NodeConfig<ChainSpec>,
137 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
138 where
139 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
140 {
141 let toml_config = self.load_toml_config(&config)?;
142 Ok(self.with(WithConfigs { config, toml_config }))
143 }
144
145 pub fn load_toml_config<ChainSpec>(
150 &self,
151 config: &NodeConfig<ChainSpec>,
152 ) -> eyre::Result<reth_config::Config>
153 where
154 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
155 {
156 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
157
158 let mut toml_config = reth_config::Config::from_path(&config_path)
159 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
160
161 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
162
163 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
164
165 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
167
168 Ok(toml_config)
169 }
170
171 fn save_pruning_config<ChainSpec>(
174 reth_config: &mut reth_config::Config,
175 config: &NodeConfig<ChainSpec>,
176 config_path: impl AsRef<std::path::Path>,
177 ) -> eyre::Result<()>
178 where
179 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
180 {
181 if let Some(prune_config) = config.prune_config() {
182 if reth_config.prune != prune_config {
183 reth_config.set_prune_config(prune_config);
184 info!(target: "reth::cli", "Saving prune config to toml file");
185 reth_config.save(config_path.as_ref())?;
186 }
187 } else if !reth_config.prune.is_default() {
188 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
189 }
190 Ok(())
191 }
192
193 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
195 self.configure_globals(reserved_cpu_cores);
196 self
197 }
198
199 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
206 match fdlimit::raise_fd_limit() {
209 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
210 debug!(from, to, "Raised file descriptor limit");
211 }
212 Ok(fdlimit::Outcome::Unsupported) => {}
213 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
214 }
215
216 let num_threads = available_parallelism()
220 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
221 if let Err(err) = ThreadPoolBuilder::new()
222 .num_threads(num_threads)
223 .thread_name(|i| format!("reth-rayon-{i}"))
224 .build_global()
225 {
226 warn!(%err, "Failed to build global thread pool")
227 }
228 }
229}
230
231#[derive(Debug, Clone)]
242pub struct LaunchContextWith<T> {
243 pub inner: LaunchContext,
245 pub attachment: T,
247}
248
249impl<T> LaunchContextWith<T> {
250 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
255 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
256 }
257
258 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
260 &self.inner.data_dir
261 }
262
263 pub const fn task_executor(&self) -> &TaskExecutor {
265 &self.inner.task_executor
266 }
267
268 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
270 LaunchContextWith {
271 inner: self.inner,
272 attachment: Attached::new(self.attachment, attachment),
273 }
274 }
275
276 pub fn inspect<F>(self, f: F) -> Self
279 where
280 F: FnOnce(&Self),
281 {
282 f(&self);
283 self
284 }
285}
286
287impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
288 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
290 if !self.attachment.config.network.trusted_peers.is_empty() {
291 info!(target: "reth::cli", "Adding trusted nodes");
292
293 self.attachment
294 .toml_config
295 .peers
296 .trusted_nodes
297 .extend(self.attachment.config.network.trusted_peers.clone());
298 }
299 Ok(self)
300 }
301}
302
303impl<L, R> LaunchContextWith<Attached<L, R>> {
304 pub const fn left(&self) -> &L {
306 &self.attachment.left
307 }
308
309 pub const fn right(&self) -> &R {
311 &self.attachment.right
312 }
313
314 pub const fn left_mut(&mut self) -> &mut L {
316 &mut self.attachment.left
317 }
318
319 pub const fn right_mut(&mut self) -> &mut R {
321 &mut self.attachment.right
322 }
323}
324impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
325 pub fn with_adjusted_configs(self) -> Self {
331 self.ensure_etl_datadir().with_adjusted_instance_ports()
332 }
333
334 pub fn ensure_etl_datadir(mut self) -> Self {
336 if self.toml_config_mut().stages.etl.dir.is_none() {
337 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
338 if etl_path.exists() {
339 if let Err(err) = fs::remove_dir_all(&etl_path) {
341 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
342 }
343 }
344 self.toml_config_mut().stages.etl.dir = Some(etl_path);
345 }
346
347 self
348 }
349
350 pub fn with_adjusted_instance_ports(mut self) -> Self {
352 self.node_config_mut().adjust_instance_ports();
353 self
354 }
355
356 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
358 self.attachment.left()
359 }
360
361 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
363 &self.left().config
364 }
365
366 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
368 &mut self.left_mut().config
369 }
370
371 pub const fn toml_config(&self) -> &reth_config::Config {
373 &self.left().toml_config
374 }
375
376 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
378 &mut self.left_mut().toml_config
379 }
380
381 pub fn chain_spec(&self) -> Arc<ChainSpec> {
383 self.node_config().chain.clone()
384 }
385
386 pub fn genesis_hash(&self) -> B256 {
388 self.node_config().chain.genesis_hash()
389 }
390
391 pub fn chain_id(&self) -> Chain {
393 self.node_config().chain.chain()
394 }
395
396 pub const fn is_dev(&self) -> bool {
398 self.node_config().dev.dev
399 }
400
401 pub fn prune_config(&self) -> PruneConfig
405 where
406 ChainSpec: reth_chainspec::EthereumHardforks,
407 {
408 let toml_config = self.toml_config().prune.clone();
409 let Some(mut node_prune_config) = self.node_config().prune_config() else {
410 return toml_config;
412 };
413
414 node_prune_config.merge(toml_config);
416 node_prune_config
417 }
418
419 pub fn prune_modes(&self) -> PruneModes
421 where
422 ChainSpec: reth_chainspec::EthereumHardforks,
423 {
424 self.prune_config().segments
425 }
426
427 pub fn pruner_builder(&self) -> PrunerBuilder
429 where
430 ChainSpec: reth_chainspec::EthereumHardforks,
431 {
432 PrunerBuilder::new(self.prune_config())
433 }
434
435 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
437 let default_jwt_path = self.data_dir().jwt();
438 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
439 Ok(secret)
440 }
441
442 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
444 where
445 Pool: TransactionPool + Unpin,
446 {
447 if let Some(interval) = self.node_config().dev.block_time {
448 MiningMode::interval(interval)
449 } else {
450 MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
451 }
452 }
453}
454
455impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
456where
457 DB: Database + Clone + 'static,
458 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
459{
460 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
464 where
465 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
466 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
467 {
468 let factory = ProviderFactory::new(
469 self.right().clone(),
470 self.chain_spec(),
471 StaticFileProvider::read_write(self.data_dir().static_files())?,
472 )
473 .with_prune_modes(self.prune_modes())
474 .with_static_files_metrics();
475
476 let has_receipt_pruning = self.toml_config().prune.has_receipts_pruning();
477
478 if let Some(unwind_target) = factory
481 .static_file_provider()
482 .check_consistency(&factory.provider()?, has_receipt_pruning)?
483 {
484 assert_ne!(
487 unwind_target,
488 PipelineTarget::Unwind(0),
489 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
490 );
491
492 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
493
494 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
495
496 let pipeline = PipelineBuilder::default()
498 .add_stages(DefaultStages::new(
499 factory.clone(),
500 tip_rx,
501 Arc::new(NoopConsensus::default()),
502 NoopHeaderDownloader::default(),
503 NoopBodiesDownloader::default(),
504 NoopEvmConfig::<Evm>::default(),
505 self.toml_config().stages.clone(),
506 self.prune_modes(),
507 None,
508 ))
509 .build(
510 factory.clone(),
511 StaticFileProducer::new(factory.clone(), self.prune_modes()),
512 );
513
514 let (tx, rx) = oneshot::channel();
516
517 self.task_executor().spawn_critical_blocking(
519 "pipeline task",
520 Box::pin(async move {
521 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
522 let _ = tx.send(result);
523 }),
524 );
525 rx.await?.inspect_err(|err| {
526 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
527 })?;
528 }
529
530 Ok(factory)
531 }
532
533 pub async fn with_provider_factory<N, Evm>(
535 self,
536 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
537 where
538 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
539 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
540 {
541 let factory = self.create_provider_factory::<N, Evm>().await?;
542 let ctx = LaunchContextWith {
543 inner: self.inner,
544 attachment: self.attachment.map_right(|_| factory),
545 };
546
547 Ok(ctx)
548 }
549}
550
551impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
552where
553 T: ProviderNodeTypes,
554{
555 pub const fn database(&self) -> &T::DB {
557 self.right().db_ref()
558 }
559
560 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
562 self.right()
563 }
564
565 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
567 self.right().static_file_provider()
568 }
569
570 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
574 self.start_prometheus_endpoint().await?;
575 Ok(self)
576 }
577
578 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
580 install_prometheus_recorder().spawn_upkeep();
582
583 let listen_addr = self.node_config().metrics.prometheus;
584 if let Some(addr) = listen_addr {
585 let config = MetricServerConfig::new(
586 addr,
587 VersionInfo {
588 version: version_metadata().cargo_pkg_version.as_ref(),
589 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
590 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
591 git_sha: version_metadata().vergen_git_sha.as_ref(),
592 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
593 build_profile: version_metadata().build_profile_name.as_ref(),
594 },
595 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
596 self.task_executor().clone(),
597 Hooks::builder()
598 .with_hook({
599 let db = self.database().clone();
600 move || db.report_metrics()
601 })
602 .with_hook({
603 let sfp = self.static_file_provider();
604 move || {
605 if let Err(error) = sfp.report_metrics() {
606 error!(%error, "Failed to report metrics for the static file provider");
607 }
608 }
609 })
610 .build(),
611 ).with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
612
613 MetricServer::new(config).serve().await?;
614 }
615
616 Ok(())
617 }
618
619 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
621 init_genesis(self.provider_factory())?;
622 Ok(self)
623 }
624
625 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
627 init_genesis(self.provider_factory())
628 }
629
630 pub fn with_metrics_task(
636 self,
637 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
638 let (metrics_sender, metrics_receiver) = unbounded_channel();
639
640 let with_metrics =
641 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
642
643 debug!(target: "reth::cli", "Spawning stages metrics listener task");
644 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
645 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
646
647 LaunchContextWith {
648 inner: self.inner,
649 attachment: self.attachment.map_right(|_| with_metrics),
650 }
651 }
652}
653
654impl<N, DB>
655 LaunchContextWith<
656 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
657 >
658where
659 N: NodeTypes,
660 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
661{
662 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
664 &self.right().provider_factory
665 }
666
667 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
669 self.right().metrics_sender.clone()
670 }
671
672 #[expect(clippy::complexity)]
674 pub fn with_blockchain_db<T, F>(
675 self,
676 create_blockchain_provider: F,
677 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
678 where
679 T: FullNodeTypes<Types = N, DB = DB>,
680 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
681 {
682 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
683
684 let metered_providers = WithMeteredProviders {
685 db_provider_container: WithMeteredProvider {
686 provider_factory: self.provider_factory().clone(),
687 metrics_sender: self.sync_metrics_tx(),
688 },
689 blockchain_db,
690 };
691
692 let ctx = LaunchContextWith {
693 inner: self.inner,
694 attachment: self.attachment.map_right(|_| metered_providers),
695 };
696
697 Ok(ctx)
698 }
699}
700
701impl<T>
702 LaunchContextWith<
703 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
704 >
705where
706 T: FullNodeTypes<Types: NodeTypesForProvider>,
707{
708 pub const fn database(&self) -> &T::DB {
710 self.provider_factory().db_ref()
711 }
712
713 pub const fn provider_factory(
715 &self,
716 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
717 &self.right().db_provider_container.provider_factory
718 }
719
720 pub fn lookup_head(&self) -> eyre::Result<Head> {
724 self.node_config()
725 .lookup_head(self.provider_factory())
726 .wrap_err("the head block is missing")
727 }
728
729 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
731 self.right().db_provider_container.metrics_sender.clone()
732 }
733
734 pub const fn blockchain_db(&self) -> &T::Provider {
736 &self.right().blockchain_db
737 }
738
739 pub async fn with_components<CB>(
741 self,
742 components_builder: CB,
743 on_component_initialized: Box<
744 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
745 >,
746 ) -> eyre::Result<
747 LaunchContextWith<
748 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
749 >,
750 >
751 where
752 CB: NodeComponentsBuilder<T>,
753 {
754 let head = self.lookup_head()?;
756
757 let builder_ctx = BuilderContext::new(
758 head,
759 self.blockchain_db().clone(),
760 self.task_executor().clone(),
761 self.configs().clone(),
762 );
763
764 debug!(target: "reth::cli", "creating components");
765 let components = components_builder.build_components(&builder_ctx).await?;
766
767 let blockchain_db = self.blockchain_db().clone();
768
769 let node_adapter = NodeAdapter {
770 components,
771 task_executor: self.task_executor().clone(),
772 provider: blockchain_db,
773 };
774
775 debug!(target: "reth::cli", "calling on_component_initialized hook");
776 on_component_initialized.on_event(node_adapter.clone())?;
777
778 let components_container = WithComponents {
779 db_provider_container: WithMeteredProvider {
780 provider_factory: self.provider_factory().clone(),
781 metrics_sender: self.sync_metrics_tx(),
782 },
783 node_adapter,
784 head,
785 };
786
787 let ctx = LaunchContextWith {
788 inner: self.inner,
789 attachment: self.attachment.map_right(|_| components_container),
790 };
791
792 Ok(ctx)
793 }
794}
795
796impl<T, CB>
797 LaunchContextWith<
798 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
799 >
800where
801 T: FullNodeTypes<Types: NodeTypesForProvider>,
802 CB: NodeComponentsBuilder<T>,
803{
804 pub const fn provider_factory(
806 &self,
807 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
808 &self.right().db_provider_container.provider_factory
809 }
810
811 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
814 where
815 C: HeadersClient<Header: BlockHeader>,
816 {
817 self.node_config().max_block(client, self.provider_factory().clone()).await
818 }
819
820 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
822 self.provider_factory().static_file_provider()
823 }
824
825 pub fn static_file_producer(
827 &self,
828 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
829 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
830 }
831
832 pub const fn head(&self) -> Head {
834 self.right().head
835 }
836
837 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
839 &self.right().node_adapter
840 }
841
842 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
844 &mut self.right_mut().node_adapter
845 }
846
847 pub const fn blockchain_db(&self) -> &T::Provider {
849 &self.node_adapter().provider
850 }
851
852 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
858 let mut initial_target = self.node_config().debug.tip;
859
860 if initial_target.is_none() {
861 initial_target = self.check_pipeline_consistency()?;
862 }
863
864 Ok(initial_target)
865 }
866
867 pub const fn terminate_after_initial_backfill(&self) -> bool {
873 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
874 }
875
876 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
881 if self.chain_spec().is_optimism() &&
882 !self.is_dev() &&
883 self.chain_id() == Chain::optimism_mainnet()
884 {
885 let latest = self.blockchain_db().last_block_number()?;
886 if latest < 105235063 {
888 error!(
889 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
890 );
891 return Err(ProviderError::BestBlockNotFound)
892 }
893 }
894
895 Ok(())
896 }
897
898 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
910 let first_stage_checkpoint = self
913 .blockchain_db()
914 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
915 .unwrap_or_default()
916 .block_number;
917
918 for stage_id in StageId::ALL.iter().skip(1) {
921 let stage_checkpoint = self
922 .blockchain_db()
923 .get_stage_checkpoint(*stage_id)?
924 .unwrap_or_default()
925 .block_number;
926
927 if stage_checkpoint < first_stage_checkpoint {
930 debug!(
931 target: "consensus::engine",
932 first_stage_checkpoint,
933 inconsistent_stage_id = %stage_id,
934 inconsistent_stage_checkpoint = stage_checkpoint,
935 "Pipeline sync progress is inconsistent"
936 );
937 return self.blockchain_db().block_hash(first_stage_checkpoint);
938 }
939 }
940
941 self.ensure_chain_specific_db_checks()?;
942
943 Ok(None)
944 }
945
946 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
948 self.right().db_provider_container.metrics_sender.clone()
949 }
950
951 pub const fn components(&self) -> &CB::Components {
953 &self.node_adapter().components
954 }
955
956 #[allow(clippy::type_complexity)]
958 pub async fn launch_exex(
959 &self,
960 installed_exex: Vec<(
961 String,
962 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
963 )>,
964 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
965 ExExLauncher::new(
966 self.head(),
967 self.node_adapter().clone(),
968 installed_exex,
969 self.configs().clone(),
970 )
971 .launch()
972 .await
973 }
974
975 pub fn era_import_source(&self) -> Option<EraImportSource> {
979 let node_config = self.node_config();
980 if !node_config.era.enabled {
981 return None;
982 }
983
984 EraImportSource::maybe_new(
985 node_config.era.source.path.clone(),
986 node_config.era.source.url.clone(),
987 || node_config.chain.chain().kind().default_era_host(),
988 || node_config.datadir().data_dir().join("era").into(),
989 )
990 }
991
992 pub fn consensus_layer_events(
1000 &self,
1001 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1002 where
1003 T::Provider: reth_provider::CanonChainTracker,
1004 {
1005 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1006 Either::Left(
1007 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1008 .map(Into::into),
1009 )
1010 } else {
1011 Either::Right(stream::empty())
1012 }
1013 }
1014
1015 pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1017 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1018
1019 let network = self.components().network().clone();
1020 let pool = self.components().pool().clone();
1021 let provider = self.node_adapter().provider.clone();
1022
1023 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1024
1025 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1026 tokio::spawn(async move { ethstats.run().await });
1027
1028 Ok(())
1029 }
1030}
1031
1032#[derive(Clone, Copy, Debug)]
1038pub struct Attached<L, R> {
1039 left: L,
1040 right: R,
1041}
1042
1043impl<L, R> Attached<L, R> {
1044 pub const fn new(left: L, right: R) -> Self {
1046 Self { left, right }
1047 }
1048
1049 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1051 where
1052 F: FnOnce(L) -> T,
1053 {
1054 Attached::new(f(self.left), self.right)
1055 }
1056
1057 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1059 where
1060 F: FnOnce(R) -> T,
1061 {
1062 Attached::new(self.left, f(self.right))
1063 }
1064
1065 pub const fn left(&self) -> &L {
1067 &self.left
1068 }
1069
1070 pub const fn right(&self) -> &R {
1072 &self.right
1073 }
1074
1075 pub const fn left_mut(&mut self) -> &mut L {
1077 &mut self.left
1078 }
1079
1080 pub const fn right_mut(&mut self) -> &mut R {
1082 &mut self.right
1083 }
1084}
1085
1086#[derive(Debug)]
1089pub struct WithConfigs<ChainSpec> {
1090 pub config: NodeConfig<ChainSpec>,
1092 pub toml_config: reth_config::Config,
1094}
1095
1096impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1097 fn clone(&self) -> Self {
1098 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1099 }
1100}
1101
1102#[derive(Debug, Clone)]
1105pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1106 provider_factory: ProviderFactory<N>,
1107 metrics_sender: UnboundedSender<MetricEvent>,
1108}
1109
1110#[expect(missing_debug_implementations)]
1113pub struct WithMeteredProviders<T>
1114where
1115 T: FullNodeTypes,
1116{
1117 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1118 blockchain_db: T::Provider,
1119}
1120
1121#[expect(missing_debug_implementations)]
1123pub struct WithComponents<T, CB>
1124where
1125 T: FullNodeTypes,
1126 CB: NodeComponentsBuilder<T>,
1127{
1128 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1129 node_adapter: NodeAdapter<T, CB::Components>,
1130 head: Head,
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use super::{LaunchContext, NodeConfig};
1136 use reth_config::Config;
1137 use reth_node_core::args::PruningArgs;
1138
1139 const EXTENSION: &str = "toml";
1140
1141 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1142 let temp_dir = tempfile::tempdir().unwrap();
1143 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1144 proc(&config_path);
1145 temp_dir.close().unwrap()
1146 }
1147
1148 #[test]
1149 fn test_save_prune_config() {
1150 with_tempdir("prune-store-test", |config_path| {
1151 let mut reth_config = Config::default();
1152 let node_config = NodeConfig {
1153 pruning: PruningArgs {
1154 full: true,
1155 block_interval: None,
1156 sender_recovery_full: false,
1157 sender_recovery_distance: None,
1158 sender_recovery_before: None,
1159 transaction_lookup_full: false,
1160 transaction_lookup_distance: None,
1161 transaction_lookup_before: None,
1162 receipts_full: false,
1163 receipts_pre_merge: false,
1164 receipts_distance: None,
1165 receipts_before: None,
1166 account_history_full: false,
1167 account_history_distance: None,
1168 account_history_before: None,
1169 storage_history_full: false,
1170 storage_history_distance: None,
1171 storage_history_before: None,
1172 bodies_pre_merge: false,
1173 bodies_distance: None,
1174 #[expect(deprecated)]
1175 receipts_log_filter: None,
1176 bodies_before: None,
1177 },
1178 ..NodeConfig::test()
1179 };
1180 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1181 .unwrap();
1182
1183 let loaded_config = Config::from_path(config_path).unwrap();
1184
1185 assert_eq!(reth_config, loaded_config);
1186 })
1187 }
1188}