1use crate::{
33 components::{NodeComponents, NodeComponentsBuilder},
34 hooks::OnComponentInitializedHook,
35 BuilderContext, ExExLauncher, NodeAdapter, PrimitivesTy,
36};
37use alloy_eips::eip2124::Head;
38use alloy_primitives::{BlockNumber, B256};
39use eyre::Context;
40use rayon::ThreadPoolBuilder;
41use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
42use reth_config::{config::EtlConfig, PruneConfig};
43use reth_consensus::noop::NoopConsensus;
44use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
45use reth_db_common::init::{init_genesis_with_settings, InitStorageError};
46use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
47use reth_engine_local::MiningMode;
48use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
49use reth_exex::ExExManagerHandle;
50use reth_fs_util as fs;
51use reth_network_p2p::headers::client::HeadersClient;
52use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
53use reth_node_core::{
54 args::DefaultEraHost,
55 dirs::{ChainPath, DataDirPath},
56 node_config::NodeConfig,
57 primitives::BlockHeader,
58 version::version_metadata,
59};
60use reth_node_metrics::{
61 chain::ChainSpecInfo,
62 hooks::Hooks,
63 recorder::install_prometheus_recorder,
64 server::{MetricServer, MetricServerConfig},
65 version::VersionInfo,
66};
67use reth_provider::{
68 providers::{NodeTypesForProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider},
69 BlockHashReader, BlockNumReader, ProviderError, ProviderFactory, ProviderResult,
70 RocksDBProviderFactory, StageCheckpointReader, StaticFileProviderBuilder,
71 StaticFileProviderFactory,
72};
73use reth_prune::{PruneModes, PrunerBuilder};
74use reth_rpc_builder::config::RethRpcServerConfig;
75use reth_rpc_layer::JwtSecret;
76use reth_stages::{
77 sets::DefaultStages, stages::EraImportSource, MetricEvent, PipelineBuilder, PipelineTarget,
78 StageId,
79};
80use reth_static_file::StaticFileProducer;
81use reth_tasks::TaskExecutor;
82use reth_tracing::{
83 throttle,
84 tracing::{debug, error, info, warn},
85};
86use reth_transaction_pool::TransactionPool;
87use reth_trie_db::ChangesetCache;
88use std::{num::NonZeroUsize, sync::Arc, thread::available_parallelism, time::Duration};
89use tokio::sync::{
90 mpsc::{unbounded_channel, UnboundedSender},
91 oneshot, watch,
92};
93
94use futures::{future::Either, stream, Stream, StreamExt};
95use reth_node_ethstats::EthStatsService;
96use reth_node_events::{cl::ConsensusLayerHealthEvents, node::NodeEvent};
97
98#[derive(Debug, Clone)]
117pub struct LaunchContext {
118 pub task_executor: TaskExecutor,
120 pub data_dir: ChainPath<DataDirPath>,
122}
123
124impl LaunchContext {
125 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
127 Self { task_executor, data_dir }
128 }
129
130 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
132 LaunchContextWith { inner: self, attachment }
133 }
134
135 pub fn with_loaded_toml_config<ChainSpec>(
140 self,
141 config: NodeConfig<ChainSpec>,
142 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>>
143 where
144 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
145 {
146 let toml_config = self.load_toml_config(&config)?;
147 Ok(self.with(WithConfigs { config, toml_config }))
148 }
149
150 pub fn load_toml_config<ChainSpec>(
155 &self,
156 config: &NodeConfig<ChainSpec>,
157 ) -> eyre::Result<reth_config::Config>
158 where
159 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
160 {
161 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
162
163 let mut toml_config = reth_config::Config::from_path(&config_path)
164 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
165
166 Self::save_pruning_config(&mut toml_config, config, &config_path)?;
167
168 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
169
170 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
172
173 toml_config.static_files =
175 config.static_files.merge_with_config(toml_config.static_files, config.pruning.minimal);
176
177 Ok(toml_config)
178 }
179
180 fn save_pruning_config<ChainSpec>(
183 reth_config: &mut reth_config::Config,
184 config: &NodeConfig<ChainSpec>,
185 config_path: impl AsRef<std::path::Path>,
186 ) -> eyre::Result<()>
187 where
188 ChainSpec: EthChainSpec + reth_chainspec::EthereumHardforks,
189 {
190 let mut should_save = reth_config.prune.segments.migrate();
191
192 if let Some(prune_config) = config.prune_config() {
193 if reth_config.prune != prune_config {
194 reth_config.set_prune_config(prune_config);
195 should_save = true;
196 }
197 } else if !reth_config.prune.is_default() {
198 warn!(target: "reth::cli", "Pruning configuration is present in the config file, but no CLI arguments are provided. Using config from file.");
199 }
200
201 if should_save {
202 info!(target: "reth::cli", "Saving prune config to toml file");
203 reth_config.save(config_path.as_ref())?;
204 }
205
206 Ok(())
207 }
208
209 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
211 self.configure_globals(reserved_cpu_cores);
212 self
213 }
214
215 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
220 match fdlimit::raise_fd_limit() {
223 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
224 debug!(from, to, "Raised file descriptor limit");
225 }
226 Ok(fdlimit::Outcome::Unsupported) => {}
227 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
228 }
229
230 let _ = reserved_cpu_cores;
234 let num_threads = available_parallelism().map_or(1, NonZeroUsize::get);
235 if let Err(err) = ThreadPoolBuilder::new()
236 .num_threads(num_threads)
237 .thread_name(|i| format!("rayon-{i:02}"))
238 .build_global()
239 {
240 warn!(%err, "Failed to build global thread pool")
241 }
242 }
243}
244
245#[derive(Debug, Clone)]
256pub struct LaunchContextWith<T> {
257 pub inner: LaunchContext,
259 pub attachment: T,
261}
262
263impl<T> LaunchContextWith<T> {
264 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
269 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
270 }
271
272 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
274 &self.inner.data_dir
275 }
276
277 pub const fn task_executor(&self) -> &TaskExecutor {
279 &self.inner.task_executor
280 }
281
282 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
284 LaunchContextWith {
285 inner: self.inner,
286 attachment: Attached::new(self.attachment, attachment),
287 }
288 }
289
290 pub fn inspect<F>(self, f: F) -> Self
293 where
294 F: FnOnce(&Self),
295 {
296 f(&self);
297 self
298 }
299}
300
301impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
302 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
304 if !self.attachment.config.network.trusted_peers.is_empty() {
305 info!(target: "reth::cli", "Adding trusted nodes");
306
307 self.attachment
308 .toml_config
309 .peers
310 .trusted_nodes
311 .extend(self.attachment.config.network.trusted_peers.clone());
312 }
313 Ok(self)
314 }
315}
316
317impl<L, R> LaunchContextWith<Attached<L, R>> {
318 pub const fn left(&self) -> &L {
320 &self.attachment.left
321 }
322
323 pub const fn right(&self) -> &R {
325 &self.attachment.right
326 }
327
328 pub const fn left_mut(&mut self) -> &mut L {
330 &mut self.attachment.left
331 }
332
333 pub const fn right_mut(&mut self) -> &mut R {
335 &mut self.attachment.right
336 }
337}
338impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
339 pub fn with_adjusted_configs(self) -> Self {
345 self.ensure_etl_datadir().with_adjusted_instance_ports()
346 }
347
348 pub fn ensure_etl_datadir(mut self) -> Self {
350 if self.toml_config_mut().stages.etl.dir.is_none() {
351 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
352 if etl_path.exists() {
353 if let Err(err) = fs::remove_dir_all(&etl_path) {
355 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
356 }
357 }
358 self.toml_config_mut().stages.etl.dir = Some(etl_path);
359 }
360
361 self
362 }
363
364 pub fn with_adjusted_instance_ports(mut self) -> Self {
366 self.node_config_mut().adjust_instance_ports();
367 self
368 }
369
370 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
372 self.attachment.left()
373 }
374
375 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
377 &self.left().config
378 }
379
380 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
382 &mut self.left_mut().config
383 }
384
385 pub const fn toml_config(&self) -> &reth_config::Config {
387 &self.left().toml_config
388 }
389
390 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
392 &mut self.left_mut().toml_config
393 }
394
395 pub fn chain_spec(&self) -> Arc<ChainSpec> {
397 self.node_config().chain.clone()
398 }
399
400 pub fn genesis_hash(&self) -> B256 {
402 self.node_config().chain.genesis_hash()
403 }
404
405 pub fn chain_id(&self) -> Chain {
407 self.node_config().chain.chain()
408 }
409
410 pub const fn is_dev(&self) -> bool {
412 self.node_config().dev.dev
413 }
414
415 pub fn prune_config(&self) -> PruneConfig
419 where
420 ChainSpec: reth_chainspec::EthereumHardforks,
421 {
422 let Some(mut node_prune_config) = self.node_config().prune_config() else {
423 return self.toml_config().prune.clone();
425 };
426
427 node_prune_config.merge(self.toml_config().prune.clone());
429 node_prune_config
430 }
431
432 pub fn prune_modes(&self) -> PruneModes
434 where
435 ChainSpec: reth_chainspec::EthereumHardforks,
436 {
437 self.prune_config().segments
438 }
439
440 pub fn pruner_builder(&self) -> PrunerBuilder
442 where
443 ChainSpec: reth_chainspec::EthereumHardforks,
444 {
445 PrunerBuilder::new(self.prune_config())
446 }
447
448 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
450 let default_jwt_path = self.data_dir().jwt();
451 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
452 Ok(secret)
453 }
454
455 pub fn dev_mining_mode<Pool>(&self, pool: Pool) -> MiningMode<Pool>
457 where
458 Pool: TransactionPool + Unpin,
459 {
460 self.node_config().dev_mining_mode(pool)
461 }
462}
463
464impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
465where
466 DB: Database + Clone + 'static,
467 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
468{
469 pub async fn create_provider_factory<N, Evm>(
473 &self,
474 changeset_cache: ChangesetCache,
475 ) -> eyre::Result<ProviderFactory<N>>
476 where
477 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
478 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
479 {
480 let static_files_config = &self.toml_config().static_files;
482 static_files_config.validate()?;
483
484 let static_file_provider =
486 StaticFileProviderBuilder::read_write(self.data_dir().static_files())
487 .with_metrics()
488 .with_blocks_per_file_for_segments(&static_files_config.as_blocks_per_file_map())
489 .with_genesis_block_number(self.chain_spec().genesis().number.unwrap_or_default())
490 .build()?;
491
492 let rocksdb_provider = RocksDBProvider::builder(self.data_dir().rocksdb())
494 .with_default_tables()
495 .with_metrics()
496 .with_statistics()
497 .build()?;
498
499 let factory = ProviderFactory::new(
500 self.right().clone(),
501 self.chain_spec(),
502 static_file_provider,
503 rocksdb_provider,
504 self.task_executor().clone(),
505 )?
506 .with_prune_modes(self.prune_modes())
507 .with_changeset_cache(changeset_cache);
508
509 let (rocksdb_unwind, static_file_unwind) = factory.check_consistency()?;
513
514 let unwind_target = [rocksdb_unwind, static_file_unwind].into_iter().flatten().min();
516
517 if let Some(unwind_block) = unwind_target {
518 let inconsistency_source = match (rocksdb_unwind, static_file_unwind) {
521 (Some(_), Some(_)) => "RocksDB and static file",
522 (Some(_), None) => "RocksDB",
523 (None, Some(_)) => "static file",
524 (None, None) => unreachable!(),
525 };
526 assert_ne!(
527 unwind_block, 0,
528 "A {} inconsistency was found that would trigger an unwind to block 0",
529 inconsistency_source
530 );
531
532 let unwind_target = PipelineTarget::Unwind(unwind_block);
533
534 info!(target: "reth::cli", %unwind_target, %inconsistency_source, "Executing unwind after consistency check.");
535
536 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
537
538 let pipeline = PipelineBuilder::default()
540 .add_stages(DefaultStages::new(
541 factory.clone(),
542 tip_rx,
543 Arc::new(NoopConsensus::default()),
544 NoopHeaderDownloader::default(),
545 NoopBodiesDownloader::default(),
546 NoopEvmConfig::<Evm>::default(),
547 self.toml_config().stages.clone(),
548 self.prune_modes(),
549 None,
550 ))
551 .build(
552 factory.clone(),
553 StaticFileProducer::new(factory.clone(), self.prune_modes()),
554 );
555
556 let (tx, rx) = oneshot::channel();
558
559 self.task_executor().spawn_critical_blocking_task("pipeline task", async move {
561 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
562 let _ = tx.send(result);
563 });
564 rx.await?.inspect_err(|err| {
565 error!(target: "reth::cli", %unwind_target, %inconsistency_source, %err, "failed to run unwind")
566 })?;
567 }
568
569 Ok(factory)
570 }
571
572 pub async fn with_provider_factory<N, Evm>(
574 self,
575 changeset_cache: ChangesetCache,
576 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
577 where
578 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
579 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
580 {
581 let factory = self.create_provider_factory::<N, Evm>(changeset_cache).await?;
582 let ctx = LaunchContextWith {
583 inner: self.inner,
584 attachment: self.attachment.map_right(|_| factory),
585 };
586
587 Ok(ctx)
588 }
589}
590
591impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
592where
593 T: ProviderNodeTypes,
594{
595 pub const fn database(&self) -> &T::DB {
597 self.right().db_ref()
598 }
599
600 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
602 self.right()
603 }
604
605 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
607 self.right().static_file_provider()
608 }
609
610 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
614 self.start_prometheus_endpoint().await?;
615 Ok(self)
616 }
617
618 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
620 install_prometheus_recorder().spawn_upkeep();
622
623 let listen_addr = self.node_config().metrics.prometheus;
624 if let Some(addr) = listen_addr {
625 let config = MetricServerConfig::new(
626 addr,
627 VersionInfo {
628 version: version_metadata().cargo_pkg_version.as_ref(),
629 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
630 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
631 git_sha: version_metadata().vergen_git_sha.as_ref(),
632 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
633 build_profile: version_metadata().build_profile_name.as_ref(),
634 },
635 ChainSpecInfo { name: self.chain_id().to_string() },
636 self.task_executor().clone(),
637 metrics_hooks(self.provider_factory()),
638 self.data_dir().pprof_dumps(),
639 )
640 .with_push_gateway(
641 self.node_config().metrics.push_gateway_url.clone(),
642 self.node_config().metrics.push_gateway_interval,
643 );
644
645 MetricServer::new(config).serve().await?;
646 }
647
648 Ok(())
649 }
650
651 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
653 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())?;
654 Ok(self)
655 }
656
657 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
659 init_genesis_with_settings(self.provider_factory(), self.node_config().storage_settings())
660 }
661
662 pub fn with_metrics_task(
668 self,
669 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
670 let (metrics_sender, metrics_receiver) = unbounded_channel();
671
672 let with_metrics =
673 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
674
675 debug!(target: "reth::cli", "Spawning stages metrics listener task");
676 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
677 self.task_executor()
678 .spawn_critical_task("stages metrics listener task", sync_metrics_listener);
679
680 LaunchContextWith {
681 inner: self.inner,
682 attachment: self.attachment.map_right(|_| with_metrics),
683 }
684 }
685}
686
687impl<N, DB>
688 LaunchContextWith<
689 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
690 >
691where
692 N: NodeTypes,
693 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
694{
695 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
697 &self.right().provider_factory
698 }
699
700 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
702 self.right().metrics_sender.clone()
703 }
704
705 #[expect(clippy::complexity)]
707 pub fn with_blockchain_db<T, F>(
708 self,
709 create_blockchain_provider: F,
710 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
711 where
712 T: FullNodeTypes<Types = N, DB = DB>,
713 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
714 {
715 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
716
717 let metered_providers = WithMeteredProviders {
718 db_provider_container: WithMeteredProvider {
719 provider_factory: self.provider_factory().clone(),
720 metrics_sender: self.sync_metrics_tx(),
721 },
722 blockchain_db,
723 };
724
725 let ctx = LaunchContextWith {
726 inner: self.inner,
727 attachment: self.attachment.map_right(|_| metered_providers),
728 };
729
730 Ok(ctx)
731 }
732}
733
734impl<T>
735 LaunchContextWith<
736 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
737 >
738where
739 T: FullNodeTypes<Types: NodeTypesForProvider>,
740{
741 pub const fn database(&self) -> &T::DB {
743 self.provider_factory().db_ref()
744 }
745
746 pub const fn provider_factory(
748 &self,
749 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
750 &self.right().db_provider_container.provider_factory
751 }
752
753 pub fn lookup_head(&self) -> eyre::Result<Head> {
757 self.node_config()
758 .lookup_head(self.provider_factory())
759 .wrap_err("the head block is missing")
760 }
761
762 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
764 self.right().db_provider_container.metrics_sender.clone()
765 }
766
767 pub const fn blockchain_db(&self) -> &T::Provider {
769 &self.right().blockchain_db
770 }
771
772 pub async fn with_components<CB>(
774 self,
775 components_builder: CB,
776 on_component_initialized: Box<
777 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
778 >,
779 ) -> eyre::Result<
780 LaunchContextWith<
781 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
782 >,
783 >
784 where
785 CB: NodeComponentsBuilder<T>,
786 {
787 let head = self.lookup_head()?;
789
790 let builder_ctx = BuilderContext::new(
791 head,
792 self.blockchain_db().clone(),
793 self.task_executor().clone(),
794 self.configs().clone(),
795 );
796
797 debug!(target: "reth::cli", "creating components");
798 let components = components_builder.build_components(&builder_ctx).await?;
799
800 let blockchain_db = self.blockchain_db().clone();
801
802 let node_adapter = NodeAdapter {
803 components,
804 task_executor: self.task_executor().clone(),
805 provider: blockchain_db,
806 };
807
808 debug!(target: "reth::cli", "calling on_component_initialized hook");
809 on_component_initialized.on_event(node_adapter.clone())?;
810
811 let components_container = WithComponents {
812 db_provider_container: WithMeteredProvider {
813 provider_factory: self.provider_factory().clone(),
814 metrics_sender: self.sync_metrics_tx(),
815 },
816 node_adapter,
817 head,
818 };
819
820 let ctx = LaunchContextWith {
821 inner: self.inner,
822 attachment: self.attachment.map_right(|_| components_container),
823 };
824
825 Ok(ctx)
826 }
827}
828
829impl<T, CB>
830 LaunchContextWith<
831 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
832 >
833where
834 T: FullNodeTypes<Types: NodeTypesForProvider>,
835 CB: NodeComponentsBuilder<T>,
836{
837 pub const fn provider_factory(
839 &self,
840 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
841 &self.right().db_provider_container.provider_factory
842 }
843
844 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
847 where
848 C: HeadersClient<Header: BlockHeader>,
849 {
850 self.node_config().max_block(client, self.provider_factory().clone()).await
851 }
852
853 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
855 self.provider_factory().static_file_provider()
856 }
857
858 pub fn static_file_producer(
860 &self,
861 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
862 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
863 }
864
865 pub const fn head(&self) -> Head {
867 self.right().head
868 }
869
870 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
872 &self.right().node_adapter
873 }
874
875 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
877 &mut self.right_mut().node_adapter
878 }
879
880 pub const fn blockchain_db(&self) -> &T::Provider {
882 &self.node_adapter().provider
883 }
884
885 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
891 let mut initial_target = self.node_config().debug.tip;
892
893 if initial_target.is_none() {
894 initial_target = self.check_pipeline_consistency()?;
895 }
896
897 Ok(initial_target)
898 }
899
900 pub const fn terminate_after_initial_backfill(&self) -> bool {
906 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
907 }
908
909 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
914 if self.chain_spec().is_optimism() &&
915 !self.is_dev() &&
916 self.chain_id() == Chain::optimism_mainnet()
917 {
918 let latest = self.blockchain_db().last_block_number()?;
919 if latest < 105235063 {
921 error!(
922 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
923 );
924 return Err(ProviderError::BestBlockNotFound);
925 }
926 }
927
928 Ok(())
929 }
930
931 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
943 let era_enabled = self.era_import_source().is_some();
945 let mut all_stages =
946 StageId::ALL.into_iter().filter(|id| era_enabled || id != &StageId::Era);
947
948 let first_stage = all_stages.next().expect("there must be at least one stage");
950
951 let first_stage_checkpoint = self
954 .blockchain_db()
955 .get_stage_checkpoint(first_stage)?
956 .unwrap_or_default()
957 .block_number;
958
959 for stage_id in all_stages {
961 let stage_checkpoint = self
962 .blockchain_db()
963 .get_stage_checkpoint(stage_id)?
964 .unwrap_or_default()
965 .block_number;
966
967 debug!(
970 target: "consensus::engine",
971 first_stage_id = %first_stage,
972 first_stage_checkpoint,
973 stage_id = %stage_id,
974 stage_checkpoint = stage_checkpoint,
975 "Checking stage against first stage",
976 );
977 if stage_checkpoint < first_stage_checkpoint {
978 debug!(
979 target: "consensus::engine",
980 first_stage_id = %first_stage,
981 first_stage_checkpoint,
982 inconsistent_stage_id = %stage_id,
983 inconsistent_stage_checkpoint = stage_checkpoint,
984 "Pipeline sync progress is inconsistent"
985 );
986 return self.blockchain_db().block_hash(first_stage_checkpoint);
987 }
988 }
989
990 self.ensure_chain_specific_db_checks()?;
991
992 Ok(None)
993 }
994
995 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
997 self.right().db_provider_container.metrics_sender.clone()
998 }
999
1000 pub const fn components(&self) -> &CB::Components {
1002 &self.node_adapter().components
1003 }
1004
1005 #[allow(clippy::type_complexity)]
1007 pub async fn launch_exex(
1008 &self,
1009 installed_exex: Vec<(
1010 String,
1011 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1012 )>,
1013 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<T::Types>>>> {
1014 self.exex_launcher(installed_exex).launch().await
1015 }
1016
1017 #[allow(clippy::type_complexity)]
1029 pub fn exex_launcher(
1030 &self,
1031 installed_exex: Vec<(
1032 String,
1033 Box<dyn crate::exex::BoxedLaunchExEx<NodeAdapter<T, CB::Components>>>,
1034 )>,
1035 ) -> ExExLauncher<NodeAdapter<T, CB::Components>> {
1036 ExExLauncher::new(
1037 self.head(),
1038 self.node_adapter().clone(),
1039 installed_exex,
1040 self.configs().clone(),
1041 )
1042 }
1043
1044 pub fn era_import_source(&self) -> Option<EraImportSource> {
1048 let node_config = self.node_config();
1049 if !node_config.era.enabled {
1050 return None;
1051 }
1052
1053 EraImportSource::maybe_new(
1054 node_config.era.source.path.clone(),
1055 node_config.era.source.url.clone(),
1056 || node_config.chain.chain().kind().default_era_host(),
1057 || node_config.datadir().data_dir().join("era").into(),
1058 )
1059 }
1060
1061 pub fn consensus_layer_events(
1069 &self,
1070 ) -> impl Stream<Item = NodeEvent<PrimitivesTy<T::Types>>> + 'static
1071 where
1072 T::Provider: reth_provider::CanonChainTracker,
1073 {
1074 if self.node_config().debug.tip.is_none() && !self.is_dev() {
1075 Either::Left(
1076 ConsensusLayerHealthEvents::new(Box::new(self.blockchain_db().clone()))
1077 .map(Into::into),
1078 )
1079 } else {
1080 Either::Right(stream::empty())
1081 }
1082 }
1083
1084 pub async fn spawn_ethstats<St>(&self, mut engine_events: St) -> eyre::Result<()>
1086 where
1087 St: Stream<Item = reth_engine_primitives::ConsensusEngineEvent<PrimitivesTy<T::Types>>>
1088 + Send
1089 + Unpin
1090 + 'static,
1091 {
1092 let Some(url) = self.node_config().debug.ethstats.as_ref() else { return Ok(()) };
1093
1094 let network = self.components().network().clone();
1095 let pool = self.components().pool().clone();
1096 let provider = self.node_adapter().provider.clone();
1097
1098 info!(target: "reth::cli", "Starting EthStats service at {}", url);
1099
1100 let ethstats = EthStatsService::new(url, network, provider, pool).await?;
1101
1102 let ethstats_for_events = ethstats.clone();
1104 let task_executor = self.task_executor().clone();
1105 task_executor.spawn_task(async move {
1106 while let Some(event) = engine_events.next().await {
1107 use reth_engine_primitives::ConsensusEngineEvent;
1108 match event {
1109 ConsensusEngineEvent::ForkBlockAdded(executed, duration) |
1110 ConsensusEngineEvent::CanonicalBlockAdded(executed, duration) => {
1111 let block_hash = executed.recovered_block.num_hash().hash;
1112 let block_number = executed.recovered_block.num_hash().number;
1113 if let Err(e) = ethstats_for_events
1114 .report_new_payload(block_hash, block_number, duration)
1115 .await
1116 {
1117 debug!(
1118 target: "ethstats",
1119 "Failed to report new payload: {}", e
1120 );
1121 }
1122 }
1123 _ => {
1124 }
1126 }
1127 }
1128 });
1129
1130 task_executor.spawn_task(async move { ethstats.run().await });
1132
1133 Ok(())
1134 }
1135}
1136
1137#[derive(Clone, Copy, Debug)]
1143pub struct Attached<L, R> {
1144 left: L,
1145 right: R,
1146}
1147
1148impl<L, R> Attached<L, R> {
1149 pub const fn new(left: L, right: R) -> Self {
1151 Self { left, right }
1152 }
1153
1154 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
1156 where
1157 F: FnOnce(L) -> T,
1158 {
1159 Attached::new(f(self.left), self.right)
1160 }
1161
1162 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
1164 where
1165 F: FnOnce(R) -> T,
1166 {
1167 Attached::new(self.left, f(self.right))
1168 }
1169
1170 pub const fn left(&self) -> &L {
1172 &self.left
1173 }
1174
1175 pub const fn right(&self) -> &R {
1177 &self.right
1178 }
1179
1180 pub const fn left_mut(&mut self) -> &mut L {
1182 &mut self.left
1183 }
1184
1185 pub const fn right_mut(&mut self) -> &mut R {
1187 &mut self.right
1188 }
1189}
1190
1191#[derive(Debug)]
1194pub struct WithConfigs<ChainSpec> {
1195 pub config: NodeConfig<ChainSpec>,
1197 pub toml_config: reth_config::Config,
1199}
1200
1201impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1202 fn clone(&self) -> Self {
1203 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1204 }
1205}
1206
1207#[derive(Debug, Clone)]
1210pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1211 provider_factory: ProviderFactory<N>,
1212 metrics_sender: UnboundedSender<MetricEvent>,
1213}
1214
1215#[expect(missing_debug_implementations)]
1218pub struct WithMeteredProviders<T>
1219where
1220 T: FullNodeTypes,
1221{
1222 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1223 blockchain_db: T::Provider,
1224}
1225
1226#[expect(missing_debug_implementations)]
1228pub struct WithComponents<T, CB>
1229where
1230 T: FullNodeTypes,
1231 CB: NodeComponentsBuilder<T>,
1232{
1233 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1234 node_adapter: NodeAdapter<T, CB::Components>,
1235 head: Head,
1236}
1237
1238pub fn metrics_hooks<N: NodeTypesWithDB>(provider_factory: &ProviderFactory<N>) -> Hooks {
1240 Hooks::builder()
1241 .with_hook({
1242 let db = provider_factory.db_ref().clone();
1243 move || throttle!(Duration::from_secs(5 * 60), || db.report_metrics())
1244 })
1245 .with_hook({
1246 let sfp = provider_factory.static_file_provider();
1247 move || {
1248 throttle!(Duration::from_secs(5 * 60), || {
1249 if let Err(error) = sfp.report_metrics() {
1250 error!(%error, "Failed to report metrics from static file provider");
1251 }
1252 })
1253 }
1254 })
1255 .with_hook({
1256 let rocksdb = provider_factory.rocksdb_provider();
1257 move || throttle!(Duration::from_secs(5 * 60), || rocksdb.report_metrics())
1258 })
1259 .build()
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264 use super::{LaunchContext, NodeConfig};
1265 use reth_config::Config;
1266 use reth_node_core::args::PruningArgs;
1267
1268 const EXTENSION: &str = "toml";
1269
1270 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1271 let temp_dir = tempfile::tempdir().unwrap();
1272 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1273 proc(&config_path);
1274 temp_dir.close().unwrap()
1275 }
1276
1277 #[test]
1278 fn test_save_prune_config() {
1279 with_tempdir("prune-store-test", |config_path| {
1280 let mut reth_config = Config::default();
1281 let node_config = NodeConfig {
1282 pruning: PruningArgs {
1283 full: true,
1284 minimal: false,
1285 block_interval: None,
1286 sender_recovery_full: false,
1287 sender_recovery_distance: None,
1288 sender_recovery_before: None,
1289 transaction_lookup_full: false,
1290 transaction_lookup_distance: None,
1291 transaction_lookup_before: None,
1292 receipts_full: false,
1293 receipts_pre_merge: false,
1294 receipts_distance: None,
1295 receipts_before: None,
1296 account_history_full: false,
1297 account_history_distance: None,
1298 account_history_before: None,
1299 storage_history_full: false,
1300 storage_history_distance: None,
1301 storage_history_before: None,
1302 bodies_pre_merge: false,
1303 bodies_distance: None,
1304 receipts_log_filter: None,
1305 bodies_before: None,
1306 },
1307 ..NodeConfig::test()
1308 };
1309 LaunchContext::save_pruning_config(&mut reth_config, &node_config, config_path)
1310 .unwrap();
1311
1312 let loaded_config = Config::from_path(config_path).unwrap();
1313
1314 assert_eq!(reth_config, loaded_config);
1315 })
1316 }
1317}