1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
69 BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70 StageCheckpointReader, StaticFileProviderBuilder, StaticFileProviderFactory,
71};
72use reth_prune::{PruneModes, PrunerBuilder};
73use reth_rpc_builder::config::RethRpcServerConfig;
74use reth_rpc_layer::JwtSecret;
75use reth_stages::{
76 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
77 StageId,
78};
79use reth_static_file::StaticFileProducer;
80use reth_tasks::TaskExecutor;
81use reth_tracing::tracing::{debug, error, info, warn};
82use reth_transaction_pool::TransactionPool;
83use std::{sync::Arc, thread::available_parallelism};
84use tokio::sync::{
85 mpsc::{unbounded_channel, UnboundedSender},
86 oneshot, watch,
87};
88
89use futures::{future::Either, stream, Stream, StreamExt};
90use reth_node_ethstats::EthStatsService;
91use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
92
93#[derive(Debug, Clone)]
112pub struct LaunchContext {
113 pub task_executor: TaskExecutor,
115 pub data_dir: ChainPath<DataDirPath>,
117}
118
119impl LaunchContext {
120 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
122 Self { task_executor, data_dir }
123 }
124
125 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
127 LaunchContextWith { inner: self, attachment }
128 }
129
130 pub fn with_loaded_toml_config<ChainSpec>(
135 self,
136 config: NodeConfig<ChainSpec>,
137 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
138 where
139 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
140 {
141 let toml_config = self.load_toml_config(&config)?;
142 Ok(self.with(WithConfigs { config, toml_config }))
143 }
144
145 pub fn load_toml_config<ChainSpec>(
150 &self,
151 config: &NodeConfig<ChainSpec>,
152 ) -> eyre::Result<reth_config::Config>
153 where
154 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
155 {
156 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
157
158 let mut toml_config = reth_config::Config::from_path(&config_path)
159 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
160
161 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
162
163 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
164
165 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
167
168 toml_config.static_files = config.static_files.merge_with_config(toml_config.static_files);
170
171 Ok(toml_config)
172 }
173
174 fn save_pruning_config<ChainSpec>(
177 reth_config: &mut reth_config::Config,
178 config: &NodeConfig<ChainSpec>,
179 config_path: impl AsRef<std::path::Path>,
180 ) -> eyre::Result<()>
181 where
182 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
183 {
184 let mut should_save = reth_config.prune.segments.migrate();
185
186 if let Some(prune_config) = config.prune_config() {
187 if reth_config.prune != prune_config {
188 reth_config.set_prune_config(prune_config);
189 should_save = true;
190 }
191 } else if !reth_config.prune.is_default() {
192 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
193 }
194
195 if should_save {
196 info!(target: "reth::cli", "Saving prune config to toml file");
197 reth_config.save(config_path.as_ref())?;
198 }
199
200 Ok(())
201 }
202
203 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
205 self.configure_globals(reserved_cpu_cores);
206 self
207 }
208
209 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
216 match fdlimit::raise_fd_limit() {
219 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
220 debug!(from, to, "Raised file descriptor limit");
221 }
222 Ok(fdlimit::Outcome::Unsupported) => {}
223 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
224 }
225
226 let num_threads = available_parallelism()
230 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
231 if let Err(err) = ThreadPoolBuilder::new()
232 .num_threads(num_threads)
233 .thread_name(|i| format!("reth-rayon-{i}"))
234 .build_global()
235 {
236 warn!(%err, "Failed to build global thread pool")
237 }
238 }
239}
240
241#[derive(Debug, Clone)]
252pub struct LaunchContextWith<T> {
253 pub inner: LaunchContext,
255 pub attachment: T,
257}
258
259impl<T> LaunchContextWith<T> {
260 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
265 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
266 }
267
268 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
270 &self.inner.data_dir
271 }
272
273 pub const fn task_executor(&self) -> &TaskExecutor {
275 &self.inner.task_executor
276 }
277
278 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
280 LaunchContextWith {
281 inner: self.inner,
282 attachment: Attached::new(self.attachment, attachment),
283 }
284 }
285
286 pub fn inspect<F>(self, f: F) -> Self
289 where
290 F: FnOnce(&Self),
291 {
292 f(&self);
293 self
294 }
295}
296
297impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
298 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
300 if !self.attachment.config.network.trusted_peers.is_empty() {
301 info!(target: "reth::cli", "Adding trusted nodes");
302
303 self.attachment
304 .toml_config
305 .peers
306 .trusted_nodes
307 .extend(self.attachment.config.network.trusted_peers.clone());
308 }
309 Ok(self)
310 }
311}
312
313impl<L, R> LaunchContextWith<Attached<L, R>> {
314 pub const fn left(&self) -> &L {
316 &self.attachment.left
317 }
318
319 pub const fn right(&self) -> &R {
321 &self.attachment.right
322 }
323
324 pub const fn left_mut(&mut self) -> &mut L {
326 &mut self.attachment.left
327 }
328
329 pub const fn right_mut(&mut self) -> &mut R {
331 &mut self.attachment.right
332 }
333}
334impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
335 pub fn with_adjusted_configs(self) -> Self {
341 self.ensure_etl_datadir().with_adjusted_instance_ports()
342 }
343
344 pub fn ensure_etl_datadir(mut self) -> Self {
346 if self.toml_config_mut().stages.etl.dir.is_none() {
347 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
348 if etl_path.exists() {
349 if let Err(err) = fs::remove_dir_all(&etl_path) {
351 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
352 }
353 }
354 self.toml_config_mut().stages.etl.dir = Some(etl_path);
355 }
356
357 self
358 }
359
360 pub fn with_adjusted_instance_ports(mut self) -> Self {
362 self.node_config_mut().adjust_instance_ports();
363 self
364 }
365
366 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
368 self.attachment.left()
369 }
370
371 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
373 &self.left().config
374 }
375
376 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
378 &mut self.left_mut().config
379 }
380
381 pub const fn toml_config(&self) -> &reth_config::Config {
383 &self.left().toml_config
384 }
385
386 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
388 &mut self.left_mut().toml_config
389 }
390
391 pub fn chain_spec(&self) -> Arc<ChainSpec> {
393 self.node_config().chain.clone()
394 }
395
396 pub fn genesis_hash(&self) -> B256 {
398 self.node_config().chain.genesis_hash()
399 }
400
401 pub fn chain_id(&self) -> Chain {
403 self.node_config().chain.chain()
404 }
405
406 pub const fn is_dev(&self) -> bool {
408 self.node_config().dev.dev
409 }
410
411 pub fn prune_config(&self) -> PruneConfig
415 where
416 ChainSpec: reth_chainspec::EthereumHardforks,
417 {
418 let Some(mut node_prune_config) = self.node_config().prune_config() else {
419 return self.toml_config().prune.clone();
421 };
422
423 node_prune_config.merge(self.toml_config().prune.clone());
425 node_prune_config
426 }
427
428 pub fn prune_modes(&self) -> PruneModes
430 where
431 ChainSpec: reth_chainspec::EthereumHardforks,
432 {
433 self.prune_config().segments
434 }
435
436 pub fn pruner_builder(&self) -> PrunerBuilder
438 where
439 ChainSpec: reth_chainspec::EthereumHardforks,
440 {
441 PrunerBuilder::new(self.prune_config())
442 }
443
444 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
446 let default_jwt_path = self.data_dir().jwt();
447 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
448 Ok(secret)
449 }
450
451 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
453 where
454 Pool: TransactionPool + Unpin,
455 {
456 if let Some(interval) = self.node_config().dev.block_time {
457 MiningMode::interval(interval)
458 } else {
459 MiningMode::instant(pool, self.node_config().dev.block_max_transactions)
460 }
461 }
462}
463
464impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
465where
466 DB: Database + Clone + 'static,
467 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
468{
469 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
473 where
474 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
475 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
476 {
477 let static_files_config = &self.toml_config().static_files;
479 static_files_config.validate()?;
480
481 let static_file_provider =
483 StaticFileProviderBuilder::read_write(self.data_dir().static_files())?
484 .with_metrics()
485 .with_blocks_per_file_for_segments(static_files_config.as_blocks_per_file_map())
486 .build()?;
487
488 let factory =
489 ProviderFactory::new(self.right().clone(), self.chain_spec(), static_file_provider)?
490 .with_prune_modes(self.prune_modes());
491
492 if let Some(unwind_target) =
495 factory.static_file_provider().check_consistency(&factory.provider()?)?
496 {
497 assert_ne!(
500 unwind_target,
501 PipelineTarget::Unwind(0),
502 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
503 );
504
505 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
506
507 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
508
509 let pipeline = PipelineBuilder::default()
511 .add_stages(DefaultStages::new(
512 factory.clone(),
513 tip_rx,
514 Arc::new(NoopConsensus::default()),
515 NoopHeaderDownloader::default(),
516 NoopBodiesDownloader::default(),
517 NoopEvmConfig::<Evm>::default(),
518 self.toml_config().stages.clone(),
519 self.prune_modes(),
520 None,
521 ))
522 .build(
523 factory.clone(),
524 StaticFileProducer::new(factory.clone(), self.prune_modes()),
525 );
526
527 let (tx, rx) = oneshot::channel();
529
530 self.task_executor().spawn_critical_blocking(
532 "pipeline task",
533 Box::pin(async move {
534 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
535 let _ = tx.send(result);
536 }),
537 );
538 rx.await?.inspect_err(|err| {
539 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
540 })?;
541 }
542
543 Ok(factory)
544 }
545
546 pub async fn with_provider_factory<N, Evm>(
548 self,
549 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
550 where
551 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
552 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
553 {
554 let factory = self.create_provider_factory::<N, Evm>().await?;
555 let ctx = LaunchContextWith {
556 inner: self.inner,
557 attachment: self.attachment.map_right(|_| factory),
558 };
559
560 Ok(ctx)
561 }
562}
563
564impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
565where
566 T: ProviderNodeTypes,
567{
568 pub const fn database(&self) -> &T::DB {
570 self.right().db_ref()
571 }
572
573 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
575 self.right()
576 }
577
578 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
580 self.right().static_file_provider()
581 }
582
583 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
587 self.start_prometheus_endpoint().await?;
588 Ok(self)
589 }
590
591 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
593 install_prometheus_recorder().spawn_upkeep();
595
596 let listen_addr = self.node_config().metrics.prometheus;
597 if let Some(addr) = listen_addr {
598 let config = MetricServerConfig::new(
599 addr,
600 VersionInfo {
601 version: version_metadata().cargo_pkg_version.as_ref(),
602 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
603 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
604 git_sha: version_metadata().vergen_git_sha.as_ref(),
605 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
606 build_profile: version_metadata().build_profile_name.as_ref(),
607 },
608 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
609 self.task_executor().clone(),
610 Hooks::builder()
611 .with_hook({
612 let db = self.database().clone();
613 move || db.report_metrics()
614 })
615 .with_hook({
616 let sfp = self.static_file_provider();
617 move || {
618 if let Err(error) = sfp.report_metrics() {
619 error!(%error, "Failed to report metrics for the static file provider");
620 }
621 }
622 })
623 .build(),
624 ).with_push_gateway(self.node_config().metrics.push_gateway_url.clone(), self.node_config().metrics.push_gateway_interval);
625
626 MetricServer::new(config).serve().await?;
627 }
628
629 Ok(())
630 }
631
632 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
634 init_genesis_with_settings(
635 self.provider_factory(),
636 self.node_config().static_files.to_settings(),
637 )?;
638 Ok(self)
639 }
640
641 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
643 init_genesis_with_settings(
644 self.provider_factory(),
645 self.node_config().static_files.to_settings(),
646 )
647 }
648
649 pub fn with_metrics_task(
655 self,
656 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
657 let (metrics_sender, metrics_receiver) = unbounded_channel();
658
659 let with_metrics =
660 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
661
662 debug!(target: "reth::cli", "Spawning stages metrics listener task");
663 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
664 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
665
666 LaunchContextWith {
667 inner: self.inner,
668 attachment: self.attachment.map_right(|_| with_metrics),
669 }
670 }
671}
672
673impl<N, DB>
674 LaunchContextWith<
675 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
676 >
677where
678 N: NodeTypes,
679 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
680{
681 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
683 &self.right().provider_factory
684 }
685
686 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
688 self.right().metrics_sender.clone()
689 }
690
691 #[expect(clippy::complexity)]
693 pub fn with_blockchain_db<T, F>(
694 self,
695 create_blockchain_provider: F,
696 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
697 where
698 T: FullNodeTypes<Types = N, DB = DB>,
699 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
700 {
701 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
702
703 let metered_providers = WithMeteredProviders {
704 db_provider_container: WithMeteredProvider {
705 provider_factory: self.provider_factory().clone(),
706 metrics_sender: self.sync_metrics_tx(),
707 },
708 blockchain_db,
709 };
710
711 let ctx = LaunchContextWith {
712 inner: self.inner,
713 attachment: self.attachment.map_right(|_| metered_providers),
714 };
715
716 Ok(ctx)
717 }
718}
719
720impl<T>
721 LaunchContextWith<
722 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
723 >
724where
725 T: FullNodeTypes<Types: NodeTypesForProvider>,
726{
727 pub const fn database(&self) -> &T::DB {
729 self.provider_factory().db_ref()
730 }
731
732 pub const fn provider_factory(
734 &self,
735 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
736 &self.right().db_provider_container.provider_factory
737 }
738
739 pub fn lookup_head(&self) -> eyre::Result<Head> {
743 self.node_config()
744 .lookup_head(self.provider_factory())
745 .wrap_err("the head block is missing")
746 }
747
748 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
750 self.right().db_provider_container.metrics_sender.clone()
751 }
752
753 pub const fn blockchain_db(&self) -> &T::Provider {
755 &self.right().blockchain_db
756 }
757
758 pub async fn with_components<CB>(
760 self,
761 components_builder: CB,
762 on_component_initialized: Box<
763 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
764 >,
765 ) -> eyre::Result<
766 LaunchContextWith<
767 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
768 >,
769 >
770 where
771 CB: NodeComponentsBuilder<T>,
772 {
773 let head = self.lookup_head()?;
775
776 let builder_ctx = BuilderContext::new(
777 head,
778 self.blockchain_db().clone(),
779 self.task_executor().clone(),
780 self.configs().clone(),
781 );
782
783 debug!(target: "reth::cli", "creating components");
784 let components = components_builder.build_components(&builder_ctx).await?;
785
786 let blockchain_db = self.blockchain_db().clone();
787
788 let node_adapter = NodeAdapter {
789 components,
790 task_executor: self.task_executor().clone(),
791 provider: blockchain_db,
792 };
793
794 debug!(target: "reth::cli", "calling on_component_initialized hook");
795 on_component_initialized.on_event(node_adapter.clone())?;
796
797 let components_container = WithComponents {
798 db_provider_container: WithMeteredProvider {
799 provider_factory: self.provider_factory().clone(),
800 metrics_sender: self.sync_metrics_tx(),
801 },
802 node_adapter,
803 head,
804 };
805
806 let ctx = LaunchContextWith {
807 inner: self.inner,
808 attachment: self.attachment.map_right(|_| components_container),
809 };
810
811 Ok(ctx)
812 }
813}
814
815impl<T, CB>
816 LaunchContextWith<
817 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
818 >
819where
820 T: FullNodeTypes<Types: NodeTypesForProvider>,
821 CB: NodeComponentsBuilder<T>,
822{
823 pub const fn provider_factory(
825 &self,
826 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
827 &self.right().db_provider_container.provider_factory
828 }
829
830 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
833 where
834 C: HeadersClient<Header: BlockHeader>,
835 {
836 self.node_config().max_block(client, self.provider_factory().clone()).await
837 }
838
839 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
841 self.provider_factory().static_file_provider()
842 }
843
844 pub fn static_file_producer(
846 &self,
847 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
848 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
849 }
850
851 pub const fn head(&self) -> Head {
853 self.right().head
854 }
855
856 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
858 &self.right().node_adapter
859 }
860
861 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
863 &mut self.right_mut().node_adapter
864 }
865
866 pub const fn blockchain_db(&self) -> &T::Provider {
868 &self.node_adapter().provider
869 }
870
871 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
877 let mut initial_target = self.node_config().debug.tip;
878
879 if initial_target.is_none() {
880 initial_target = self.check_pipeline_consistency()?;
881 }
882
883 Ok(initial_target)
884 }
885
886 pub const fn terminate_after_initial_backfill(&self) -> bool {
892 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
893 }
894
895 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
900 if self.chain_spec().is_optimism() &&
901 !self.is_dev() &&
902 self.chain_id() == Chain::optimism_mainnet()
903 {
904 let latest = self.blockchain_db().last_block_number()?;
905 if latest < 105235063 {
907 error!(
908 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
909 );
910 return Err(ProviderError::BestBlockNotFound)
911 }
912 }
913
914 Ok(())
915 }
916
917 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
929 let first_stage_checkpoint = self
932 .blockchain_db()
933 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
934 .unwrap_or_default()
935 .block_number;
936
937 for stage_id in StageId::ALL.iter().skip(1) {
940 let stage_checkpoint = self
941 .blockchain_db()
942 .get_stage_checkpoint(*stage_id)?
943 .unwrap_or_default()
944 .block_number;
945
946 if stage_checkpoint < first_stage_checkpoint {
949 debug!(
950 target: "consensus::engine",
951 first_stage_checkpoint,
952 inconsistent_stage_id = %stage_id,
953 inconsistent_stage_checkpoint = stage_checkpoint,
954 "Pipeline sync progress is inconsistent"
955 );
956 return self.blockchain_db().block_hash(first_stage_checkpoint);
957 }
958 }
959
960 self.ensure_chain_specific_db_checks()?;
961
962 Ok(None)
963 }
964
965 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
967 self.right().db_provider_container.metrics_sender.clone()
968 }
969
970 pub const fn components(&self) -> &CB::Components {
972 &self.node_adapter().components
973 }
974
975 #[allow(clippy::type_complexity)]
977 pub async fn launch_exex(
978 &self,
979 installed_exex: Vec<(
980 String,
981 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
982 )>,
983 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
984 ExExLauncher::new(
985 self.head(),
986 self.node_adapter().clone(),
987 installed_exex,
988 self.configs().clone(),
989 )
990 .launch()
991 .await
992 }
993
994 pub fn era_import_source(&self) -> Option<EraImportSource> {
998 let node_config = self.node_config();
999 if !node_config.era.enabled {
1000 return None;
1001 }
1002
1003 EraImportSource::maybe_new(
1004 node_config.era.source.path.clone(),
1005 node_config.era.source.url.clone(),
1006 || node_config.chain.chain().kind().default_era_host(),
1007 || node_config.datadir().data_dir().join("era").into(),
1008 )
1009 }
1010
1011 pub fn consensus_layer_events(
1019 &self,
1020 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1021 where
1022 T::Provider: reth_provider::CanonChainTracker,
1023 {
1024 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1025 Either::Left(
1026 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1027 .map(Into::into),
1028 )
1029 } else {
1030 Either::Right(stream::empty())
1031 }
1032 }
1033
1034 pub async fn spawn_ethstats(&self) -> eyre::Result<()> {
1036 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1037
1038 let network = self.components().network().clone();
1039 let pool = self.components().pool().clone();
1040 let provider = self.node_adapter().provider.clone();
1041
1042 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1043
1044 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1045 tokio::spawn(async move { ethstats.run().await });
1046
1047 Ok(())
1048 }
1049}
1050
1051#[derive(Clone, Copy, Debug)]
1057pub struct Attached<L, R> {
1058 left: L,
1059 right: R,
1060}
1061
1062impl<L, R> Attached<L, R> {
1063 pub const fn new(left: L, right: R) -> Self {
1065 Self { left, right }
1066 }
1067
1068 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1070 where
1071 F: FnOnce(L) -> T,
1072 {
1073 Attached::new(f(self.left), self.right)
1074 }
1075
1076 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1078 where
1079 F: FnOnce(R) -> T,
1080 {
1081 Attached::new(self.left, f(self.right))
1082 }
1083
1084 pub const fn left(&self) -> &L {
1086 &self.left
1087 }
1088
1089 pub const fn right(&self) -> &R {
1091 &self.right
1092 }
1093
1094 pub const fn left_mut(&mut self) -> &mut L {
1096 &mut self.left
1097 }
1098
1099 pub const fn right_mut(&mut self) -> &mut R {
1101 &mut self.right
1102 }
1103}
1104
1105#[derive(Debug)]
1108pub struct WithConfigs<ChainSpec> {
1109 pub config: NodeConfig<ChainSpec>,
1111 pub toml_config: reth_config::Config,
1113}
1114
1115impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1116 fn clone(&self) -> Self {
1117 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1118 }
1119}
1120
1121#[derive(Debug, Clone)]
1124pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1125 provider_factory: ProviderFactory<N>,
1126 metrics_sender: UnboundedSender<MetricEvent>,
1127}
1128
1129#[expect(missing_debug_implementations)]
1132pub struct WithMeteredProviders<T>
1133where
1134 T: FullNodeTypes,
1135{
1136 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1137 blockchain_db: T::Provider,
1138}
1139
1140#[expect(missing_debug_implementations)]
1142pub struct WithComponents<T, CB>
1143where
1144 T: FullNodeTypes,
1145 CB: NodeComponentsBuilder<T>,
1146{
1147 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1148 node_adapter: NodeAdapter<T, CB::Components>,
1149 head: Head,
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::{LaunchContext, NodeConfig};
1155 use reth_config::Config;
1156 use reth_node_core::args::PruningArgs;
1157
1158 const EXTENSION: &str = "toml";
1159
1160 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1161 let temp_dir = tempfile::tempdir().unwrap();
1162 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1163 proc(&config_path);
1164 temp_dir.close().unwrap()
1165 }
1166
1167 #[test]
1168 fn test_save_prune_config() {
1169 with_tempdir("prune-store-test", |config_path| {
1170 let mut reth_config = Config::default();
1171 let node_config = NodeConfig {
1172 pruning: PruningArgs {
1173 full: true,
1174 block_interval: None,
1175 sender_recovery_full: false,
1176 sender_recovery_distance: None,
1177 sender_recovery_before: None,
1178 transaction_lookup_full: false,
1179 transaction_lookup_distance: None,
1180 transaction_lookup_before: None,
1181 receipts_full: false,
1182 receipts_pre_merge: false,
1183 receipts_distance: None,
1184 receipts_before: None,
1185 account_history_full: false,
1186 account_history_distance: None,
1187 account_history_before: None,
1188 storage_history_full: false,
1189 storage_history_distance: None,
1190 storage_history_before: None,
1191 bodies_pre_merge: false,
1192 bodies_distance: None,
1193 receipts_log_filter: None,
1194 bodies_before: None,
1195 },
1196 ..NodeConfig::test()
1197 };
1198 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1199 .unwrap();
1200
1201 let loaded_config = Config::from_path(config_path).unwrap();
1202
1203 assert_eq!(reth_config, loaded_config);
1204 })
1205 }
1206}