1use crate::{
4 components::{NodeComponents, NodeComponentsBuilder},
5 hooks::OnComponentInitializedHook,
6 BuilderContext, NodeAdapter,
7};
8use alloy_eips::eip2124::Head;
9use alloy_primitives::{BlockNumber, B256};
10use eyre::{Context, OptionExt};
11use rayon::ThreadPoolBuilder;
12use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
13use reth_config::{config::EtlConfig, PruneConfig};
14use reth_consensus::noop::NoopConsensus;
15use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
16use reth_db_common::init::{init_genesis, InitStorageError};
17use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
18use reth_engine_local::MiningMode;
19use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
20use reth_evm::noop::NoopBlockExecutorProvider;
21use reth_fs_util as fs;
22use reth_invalid_block_hooks::InvalidBlockWitnessHook;
23use reth_network_p2p::headers::client::HeadersClient;
24use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
25use reth_node_core::{
26 args::InvalidBlockHookType,
27 dirs::{ChainPath, DataDirPath},
28 node_config::NodeConfig,
29 primitives::BlockHeader,
30 version::{
31 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
32 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
33 },
34};
35use reth_node_metrics::{
36 chain::ChainSpecInfo,
37 hooks::Hooks,
38 recorder::install_prometheus_recorder,
39 server::{MetricServer, MetricServerConfig},
40 version::VersionInfo,
41};
42use reth_provider::{
43 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
44 BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
45 ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
46};
47use reth_prune::{PruneModes, PrunerBuilder};
48use reth_rpc_api::clients::EthApiClient;
49use reth_rpc_builder::config::RethRpcServerConfig;
50use reth_rpc_layer::JwtSecret;
51use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
52use reth_static_file::StaticFileProducer;
53use reth_tasks::TaskExecutor;
54use reth_tracing::tracing::{debug, error, info, warn};
55use reth_transaction_pool::TransactionPool;
56use std::{sync::Arc, thread::available_parallelism};
57use tokio::sync::{
58 mpsc::{unbounded_channel, UnboundedSender},
59 oneshot, watch,
60};
61
62#[derive(Debug, Clone)]
66pub struct LaunchContext {
67 pub task_executor: TaskExecutor,
69 pub data_dir: ChainPath<DataDirPath>,
71}
72
73impl LaunchContext {
74 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
76 Self { task_executor, data_dir }
77 }
78
79 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
81 LaunchContextWith { inner: self, attachment }
82 }
83
84 pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
89 self,
90 config: NodeConfig<ChainSpec>,
91 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
92 let toml_config = self.load_toml_config(&config)?;
93 Ok(self.with(WithConfigs { config, toml_config }))
94 }
95
96 pub fn load_toml_config<ChainSpec: EthChainSpec>(
101 &self,
102 config: &NodeConfig<ChainSpec>,
103 ) -> eyre::Result<reth_config::Config> {
104 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
105
106 let mut toml_config = reth_config::Config::from_path(&config_path)
107 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
108
109 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
110
111 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
112
113 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
115
116 Ok(toml_config)
117 }
118
119 fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
121 reth_config: &mut reth_config::Config,
122 config: &NodeConfig<ChainSpec>,
123 config_path: impl AsRef<std::path::Path>,
124 ) -> eyre::Result<()> {
125 if reth_config.prune.is_none() {
126 if let Some(prune_config) = config.prune_config() {
127 reth_config.update_prune_config(prune_config);
128 info!(target: "reth::cli", "Saving prune config to toml file");
129 reth_config.save(config_path.as_ref())?;
130 }
131 } else if config.prune_config().is_none() {
132 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
133 }
134 Ok(())
135 }
136
137 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
139 self.configure_globals(reserved_cpu_cores);
140 self
141 }
142
143 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
150 match fdlimit::raise_fd_limit() {
153 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
154 debug!(from, to, "Raised file descriptor limit");
155 }
156 Ok(fdlimit::Outcome::Unsupported) => {}
157 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
158 }
159
160 let num_threads = available_parallelism()
164 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
165 if let Err(err) = ThreadPoolBuilder::new()
166 .num_threads(num_threads)
167 .thread_name(|i| format!("reth-rayon-{i}"))
168 .build_global()
169 {
170 error!(%err, "Failed to build global thread pool")
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
181pub struct LaunchContextWith<T> {
182 pub inner: LaunchContext,
184 pub attachment: T,
186}
187
188impl<T> LaunchContextWith<T> {
189 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
194 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
195 }
196
197 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
199 &self.inner.data_dir
200 }
201
202 pub const fn task_executor(&self) -> &TaskExecutor {
204 &self.inner.task_executor
205 }
206
207 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
209 LaunchContextWith {
210 inner: self.inner,
211 attachment: Attached::new(self.attachment, attachment),
212 }
213 }
214
215 pub fn inspect<F>(self, f: F) -> Self
218 where
219 F: FnOnce(&Self),
220 {
221 f(&self);
222 self
223 }
224}
225
226impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
227 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
229 if !self.attachment.config.network.trusted_peers.is_empty() {
230 info!(target: "reth::cli", "Adding trusted nodes");
231
232 self.attachment
233 .toml_config
234 .peers
235 .trusted_nodes
236 .extend(self.attachment.config.network.trusted_peers.clone());
237 }
238 Ok(self)
239 }
240}
241
242impl<L, R> LaunchContextWith<Attached<L, R>> {
243 pub const fn left(&self) -> &L {
245 &self.attachment.left
246 }
247
248 pub const fn right(&self) -> &R {
250 &self.attachment.right
251 }
252
253 pub const fn left_mut(&mut self) -> &mut L {
255 &mut self.attachment.left
256 }
257
258 pub const fn right_mut(&mut self) -> &mut R {
260 &mut self.attachment.right
261 }
262}
263impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
264 pub fn with_adjusted_configs(self) -> Self {
270 self.ensure_etl_datadir().with_adjusted_instance_ports()
271 }
272
273 pub fn ensure_etl_datadir(mut self) -> Self {
275 if self.toml_config_mut().stages.etl.dir.is_none() {
276 self.toml_config_mut().stages.etl.dir =
277 Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
278 }
279
280 self
281 }
282
283 pub fn with_adjusted_instance_ports(mut self) -> Self {
285 self.node_config_mut().adjust_instance_ports();
286 self
287 }
288
289 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
291 self.attachment.left()
292 }
293
294 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
296 &self.left().config
297 }
298
299 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
301 &mut self.left_mut().config
302 }
303
304 pub const fn toml_config(&self) -> &reth_config::Config {
306 &self.left().toml_config
307 }
308
309 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
311 &mut self.left_mut().toml_config
312 }
313
314 pub fn chain_spec(&self) -> Arc<ChainSpec> {
316 self.node_config().chain.clone()
317 }
318
319 pub fn genesis_hash(&self) -> B256 {
321 self.node_config().chain.genesis_hash()
322 }
323
324 pub fn chain_id(&self) -> Chain {
326 self.node_config().chain.chain()
327 }
328
329 pub const fn is_dev(&self) -> bool {
331 self.node_config().dev.dev
332 }
333
334 pub fn prune_config(&self) -> Option<PruneConfig> {
337 let Some(mut node_prune_config) = self.node_config().prune_config() else {
338 return self.toml_config().prune.clone();
340 };
341
342 node_prune_config.merge(self.toml_config().prune.clone());
344 Some(node_prune_config)
345 }
346
347 pub fn prune_modes(&self) -> PruneModes {
349 self.prune_config().map(|config| config.segments).unwrap_or_default()
350 }
351
352 pub fn pruner_builder(&self) -> PrunerBuilder {
354 PrunerBuilder::new(self.prune_config().unwrap_or_default())
355 .delete_limit(self.chain_spec().prune_delete_limit())
356 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
357 }
358
359 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
361 let default_jwt_path = self.data_dir().jwt();
362 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
363 Ok(secret)
364 }
365
366 pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
368 if let Some(interval) = self.node_config().dev.block_time {
369 MiningMode::interval(interval)
370 } else {
371 MiningMode::instant(pool)
372 }
373 }
374}
375
376impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
377where
378 DB: Database + Clone + 'static,
379 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
380{
381 pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
385 where
386 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
387 {
388 let factory = ProviderFactory::new(
389 self.right().clone(),
390 self.chain_spec(),
391 StaticFileProvider::read_write(self.data_dir().static_files())?,
392 )
393 .with_prune_modes(self.prune_modes())
394 .with_static_files_metrics();
395
396 let has_receipt_pruning =
397 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
398
399 if let Some(unwind_target) = factory
402 .static_file_provider()
403 .check_consistency(&factory.provider()?, has_receipt_pruning)?
404 {
405 assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
408
409 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
410
411 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
412
413 let pipeline = PipelineBuilder::default()
415 .add_stages(DefaultStages::new(
416 factory.clone(),
417 tip_rx,
418 Arc::new(NoopConsensus::default()),
419 NoopHeaderDownloader::default(),
420 NoopBodiesDownloader::default(),
421 NoopBlockExecutorProvider::<N::Primitives>::default(),
422 self.toml_config().stages.clone(),
423 self.prune_modes(),
424 ))
425 .build(
426 factory.clone(),
427 StaticFileProducer::new(factory.clone(), self.prune_modes()),
428 );
429
430 let (tx, rx) = oneshot::channel();
432
433 self.task_executor().spawn_critical_blocking(
435 "pipeline task",
436 Box::pin(async move {
437 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
438 let _ = tx.send(result);
439 }),
440 );
441 rx.await??;
442 }
443
444 Ok(factory)
445 }
446
447 pub async fn with_provider_factory<N>(
449 self,
450 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
451 where
452 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
453 {
454 let factory = self.create_provider_factory().await?;
455 let ctx = LaunchContextWith {
456 inner: self.inner,
457 attachment: self.attachment.map_right(|_| factory),
458 };
459
460 Ok(ctx)
461 }
462}
463
464impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
465where
466 T: ProviderNodeTypes,
467{
468 pub const fn database(&self) -> &T::DB {
470 self.right().db_ref()
471 }
472
473 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
475 self.right()
476 }
477
478 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
480 self.right().static_file_provider()
481 }
482
483 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
487 self.start_prometheus_endpoint().await?;
488 Ok(self)
489 }
490
491 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
493 install_prometheus_recorder().spawn_upkeep();
495
496 let listen_addr = self.node_config().metrics;
497 if let Some(addr) = listen_addr {
498 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
499 let config = MetricServerConfig::new(
500 addr,
501 VersionInfo {
502 version: CARGO_PKG_VERSION,
503 build_timestamp: VERGEN_BUILD_TIMESTAMP,
504 cargo_features: VERGEN_CARGO_FEATURES,
505 git_sha: VERGEN_GIT_SHA,
506 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
507 build_profile: BUILD_PROFILE_NAME,
508 },
509 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
510 self.task_executor().clone(),
511 Hooks::builder()
512 .with_hook({
513 let db = self.database().clone();
514 move || db.report_metrics()
515 })
516 .with_hook({
517 let sfp = self.static_file_provider();
518 move || {
519 if let Err(error) = sfp.report_metrics() {
520 error!(%error, "Failed to report metrics for the static file provider");
521 }
522 }
523 })
524 .build(),
525 );
526
527 MetricServer::new(config).serve().await?;
528 }
529
530 Ok(())
531 }
532
533 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
535 init_genesis(self.provider_factory())?;
536 Ok(self)
537 }
538
539 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
541 init_genesis(self.provider_factory())
542 }
543
544 pub fn with_metrics_task(
550 self,
551 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
552 let (metrics_sender, metrics_receiver) = unbounded_channel();
553
554 let with_metrics =
555 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
556
557 debug!(target: "reth::cli", "Spawning stages metrics listener task");
558 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
559 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
560
561 LaunchContextWith {
562 inner: self.inner,
563 attachment: self.attachment.map_right(|_| with_metrics),
564 }
565 }
566}
567
568impl<N, DB>
569 LaunchContextWith<
570 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
571 >
572where
573 N: NodeTypes,
574 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
575{
576 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
578 &self.right().provider_factory
579 }
580
581 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
583 self.right().metrics_sender.clone()
584 }
585
586 #[expect(clippy::complexity)]
588 pub fn with_blockchain_db<T, F>(
589 self,
590 create_blockchain_provider: F,
591 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
592 where
593 T: FullNodeTypes<Types = N, DB = DB>,
594 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
595 {
596 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
597
598 let metered_providers = WithMeteredProviders {
599 db_provider_container: WithMeteredProvider {
600 provider_factory: self.provider_factory().clone(),
601 metrics_sender: self.sync_metrics_tx(),
602 },
603 blockchain_db,
604 };
605
606 let ctx = LaunchContextWith {
607 inner: self.inner,
608 attachment: self.attachment.map_right(|_| metered_providers),
609 };
610
611 Ok(ctx)
612 }
613}
614
615impl<T>
616 LaunchContextWith<
617 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
618 >
619where
620 T: FullNodeTypes<Types: NodeTypesForProvider>,
621{
622 pub const fn database(&self) -> &T::DB {
624 self.provider_factory().db_ref()
625 }
626
627 pub const fn provider_factory(
629 &self,
630 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
631 &self.right().db_provider_container.provider_factory
632 }
633
634 pub fn lookup_head(&self) -> eyre::Result<Head> {
638 self.node_config()
639 .lookup_head(self.provider_factory())
640 .wrap_err("the head block is missing")
641 }
642
643 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
645 self.right().db_provider_container.metrics_sender.clone()
646 }
647
648 pub const fn blockchain_db(&self) -> &T::Provider {
650 &self.right().blockchain_db
651 }
652
653 pub async fn with_components<CB>(
655 self,
656 components_builder: CB,
657 on_component_initialized: Box<
658 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
659 >,
660 ) -> eyre::Result<
661 LaunchContextWith<
662 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
663 >,
664 >
665 where
666 CB: NodeComponentsBuilder<T>,
667 {
668 let head = self.lookup_head()?;
670
671 let builder_ctx = BuilderContext::new(
672 head,
673 self.blockchain_db().clone(),
674 self.task_executor().clone(),
675 self.configs().clone(),
676 );
677
678 debug!(target: "reth::cli", "creating components");
679 let components = components_builder.build_components(&builder_ctx).await?;
680
681 let blockchain_db = self.blockchain_db().clone();
682
683 let node_adapter = NodeAdapter {
684 components,
685 task_executor: self.task_executor().clone(),
686 provider: blockchain_db,
687 };
688
689 debug!(target: "reth::cli", "calling on_component_initialized hook");
690 on_component_initialized.on_event(node_adapter.clone())?;
691
692 let components_container = WithComponents {
693 db_provider_container: WithMeteredProvider {
694 provider_factory: self.provider_factory().clone(),
695 metrics_sender: self.sync_metrics_tx(),
696 },
697 node_adapter,
698 head,
699 };
700
701 let ctx = LaunchContextWith {
702 inner: self.inner,
703 attachment: self.attachment.map_right(|_| components_container),
704 };
705
706 Ok(ctx)
707 }
708}
709
710impl<T, CB>
711 LaunchContextWith<
712 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
713 >
714where
715 T: FullNodeTypes<Types: NodeTypesForProvider>,
716 CB: NodeComponentsBuilder<T>,
717{
718 pub const fn provider_factory(
720 &self,
721 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
722 &self.right().db_provider_container.provider_factory
723 }
724
725 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
728 where
729 C: HeadersClient<Header: BlockHeader>,
730 {
731 self.node_config().max_block(client, self.provider_factory().clone()).await
732 }
733
734 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
736 self.provider_factory().static_file_provider()
737 }
738
739 pub fn static_file_producer(
741 &self,
742 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
743 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
744 }
745
746 pub const fn head(&self) -> Head {
748 self.right().head
749 }
750
751 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
753 &self.right().node_adapter
754 }
755
756 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
758 &mut self.right_mut().node_adapter
759 }
760
761 pub const fn blockchain_db(&self) -> &T::Provider {
763 &self.node_adapter().provider
764 }
765
766 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
772 let mut initial_target = self.node_config().debug.tip;
773
774 if initial_target.is_none() {
775 initial_target = self.check_pipeline_consistency()?;
776 }
777
778 Ok(initial_target)
779 }
780
781 pub const fn terminate_after_initial_backfill(&self) -> bool {
787 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
788 }
789
790 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
795 if self.chain_spec().is_optimism() &&
796 !self.is_dev() &&
797 self.chain_id() == Chain::optimism_mainnet()
798 {
799 let latest = self.blockchain_db().last_block_number()?;
800 if latest < 105235063 {
802 error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended");
803 return Err(ProviderError::BestBlockNotFound)
804 }
805 }
806
807 Ok(())
808 }
809
810 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
822 let first_stage_checkpoint = self
825 .blockchain_db()
826 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
827 .unwrap_or_default()
828 .block_number;
829
830 for stage_id in StageId::ALL.iter().skip(1) {
833 let stage_checkpoint = self
834 .blockchain_db()
835 .get_stage_checkpoint(*stage_id)?
836 .unwrap_or_default()
837 .block_number;
838
839 if stage_checkpoint < first_stage_checkpoint {
842 debug!(
843 target: "consensus::engine",
844 first_stage_checkpoint,
845 inconsistent_stage_id = %stage_id,
846 inconsistent_stage_checkpoint = stage_checkpoint,
847 "Pipeline sync progress is inconsistent"
848 );
849 return self.blockchain_db().block_hash(first_stage_checkpoint);
850 }
851 }
852
853 self.ensure_chain_specific_db_checks()?;
854
855 Ok(None)
856 }
857
858 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
860 self.right().db_provider_container.metrics_sender.clone()
861 }
862
863 pub const fn components(&self) -> &CB::Components {
865 &self.node_adapter().components
866 }
867}
868
869impl<T, CB>
870 LaunchContextWith<
871 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
872 >
873where
874 T: FullNodeTypes<
875 Provider: StateProviderFactory + ChainSpecProvider,
876 Types: NodeTypesForProvider,
877 >,
878 CB: NodeComponentsBuilder<T>,
879{
880 pub fn invalid_block_hook(
882 &self,
883 ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
884 let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
885 return Ok(Box::new(NoopInvalidBlockHook::default()))
886 };
887 let healthy_node_rpc_client = self.get_healthy_node_client()?;
888
889 let output_directory = self.data_dir().invalid_block_hooks();
890 let hooks = hook
891 .iter()
892 .copied()
893 .map(|hook| {
894 let output_directory = output_directory.join(hook.to_string());
895 fs::create_dir_all(&output_directory)?;
896
897 Ok(match hook {
898 InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
899 self.blockchain_db().clone(),
900 self.components().block_executor().clone(),
901 output_directory,
902 healthy_node_rpc_client.clone(),
903 )),
904 InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
905 eyre::bail!("invalid block hook {hook:?} is not implemented yet")
906 }
907 } as Box<dyn InvalidBlockHook<_>>)
908 })
909 .collect::<Result<_, _>>()?;
910
911 Ok(Box::new(InvalidBlockHooks(hooks)))
912 }
913
914 fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
916 self.node_config()
917 .debug
918 .healthy_node_rpc_url
919 .as_ref()
920 .map(|url| {
921 let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
922
923 let chain_id = futures::executor::block_on(async {
925 EthApiClient::<
926 alloy_rpc_types::Transaction,
927 alloy_rpc_types::Block,
928 alloy_rpc_types::Receipt,
929 alloy_rpc_types::Header,
930 >::chain_id(&client)
931 .await
932 })?
933 .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
934 if chain_id.to::<u64>() != self.chain_id().id() {
935 eyre::bail!("invalid chain id for healthy node: {chain_id}")
936 }
937
938 Ok(client)
939 })
940 .transpose()
941 }
942}
943
944#[derive(Clone, Copy, Debug)]
946pub struct Attached<L, R> {
947 left: L,
948 right: R,
949}
950
951impl<L, R> Attached<L, R> {
952 pub const fn new(left: L, right: R) -> Self {
954 Self { left, right }
955 }
956
957 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
959 where
960 F: FnOnce(L) -> T,
961 {
962 Attached::new(f(self.left), self.right)
963 }
964
965 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
967 where
968 F: FnOnce(R) -> T,
969 {
970 Attached::new(self.left, f(self.right))
971 }
972
973 pub const fn left(&self) -> &L {
975 &self.left
976 }
977
978 pub const fn right(&self) -> &R {
980 &self.right
981 }
982
983 pub const fn left_mut(&mut self) -> &mut R {
985 &mut self.right
986 }
987
988 pub const fn right_mut(&mut self) -> &mut R {
990 &mut self.right
991 }
992}
993
994#[derive(Debug)]
997pub struct WithConfigs<ChainSpec> {
998 pub config: NodeConfig<ChainSpec>,
1000 pub toml_config: reth_config::Config,
1002}
1003
1004impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1005 fn clone(&self) -> Self {
1006 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1007 }
1008}
1009
1010#[derive(Debug, Clone)]
1013pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1014 provider_factory: ProviderFactory<N>,
1015 metrics_sender: UnboundedSender<MetricEvent>,
1016}
1017
1018#[expect(missing_debug_implementations)]
1021pub struct WithMeteredProviders<T>
1022where
1023 T: FullNodeTypes,
1024{
1025 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1026 blockchain_db: T::Provider,
1027}
1028
1029#[expect(missing_debug_implementations)]
1031pub struct WithComponents<T, CB>
1032where
1033 T: FullNodeTypes,
1034 CB: NodeComponentsBuilder<T>,
1035{
1036 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1037 node_adapter: NodeAdapter<T, CB::Components>,
1038 head: Head,
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::{LaunchContext, NodeConfig};
1044 use reth_config::Config;
1045 use reth_node_core::args::PruningArgs;
1046
1047 const EXTENSION: &str = "toml";
1048
1049 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1050 let temp_dir = tempfile::tempdir().unwrap();
1051 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1052 proc(&config_path);
1053 temp_dir.close().unwrap()
1054 }
1055
1056 #[test]
1057 fn test_save_prune_config() {
1058 with_tempdir("prune-store-test", |config_path| {
1059 let mut reth_config = Config::default();
1060 let node_config = NodeConfig {
1061 pruning: PruningArgs {
1062 full: true,
1063 block_interval: None,
1064 sender_recovery_full: false,
1065 sender_recovery_distance: None,
1066 sender_recovery_before: None,
1067 transaction_lookup_full: false,
1068 transaction_lookup_distance: None,
1069 transaction_lookup_before: None,
1070 receipts_full: false,
1071 receipts_distance: None,
1072 receipts_before: None,
1073 account_history_full: false,
1074 account_history_distance: None,
1075 account_history_before: None,
1076 storage_history_full: false,
1077 storage_history_distance: None,
1078 storage_history_before: None,
1079 receipts_log_filter: vec![],
1080 },
1081 ..NodeConfig::test()
1082 };
1083 LaunchContext::save_pruning_config_if_full_node(
1084 &mut reth_config,
1085 &node_config,
1086 config_path,
1087 )
1088 .unwrap();
1089
1090 let loaded_config = Config::from_path(config_path).unwrap();
1091
1092 assert_eq!(reth_config, loaded_config);
1093 })
1094 }
1095}