1use std::{sync::Arc, thread::available_parallelism};
4
5use crate::{
6 components::{NodeComponents, NodeComponentsBuilder},
7 hooks::OnComponentInitializedHook,
8 BuilderContext, NodeAdapter,
9};
10use alloy_primitives::{BlockNumber, B256};
11use eyre::{Context, OptionExt};
12use rayon::ThreadPoolBuilder;
13use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
14use reth_config::{config::EtlConfig, PruneConfig};
15use reth_consensus::noop::NoopConsensus;
16use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
17use reth_db_common::init::{init_genesis, InitStorageError};
18use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
19use reth_engine_local::MiningMode;
20use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
21use reth_evm::noop::NoopBlockExecutorProvider;
22use reth_fs_util as fs;
23use reth_invalid_block_hooks::InvalidBlockWitnessHook;
24use reth_network_p2p::headers::client::HeadersClient;
25use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
26use reth_node_core::{
27 args::InvalidBlockHookType,
28 dirs::{ChainPath, DataDirPath},
29 node_config::NodeConfig,
30 primitives::BlockHeader,
31 version::{
32 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
33 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
34 },
35};
36use reth_node_metrics::{
37 chain::ChainSpecInfo,
38 hooks::Hooks,
39 recorder::install_prometheus_recorder,
40 server::{MetricServer, MetricServerConfig},
41 version::VersionInfo,
42};
43use reth_primitives::Head;
44use reth_provider::{
45 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
46 BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
47 ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
48};
49use reth_prune::{PruneModes, PrunerBuilder};
50use reth_rpc_api::clients::EthApiClient;
51use reth_rpc_builder::config::RethRpcServerConfig;
52use reth_rpc_layer::JwtSecret;
53use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
54use reth_static_file::StaticFileProducer;
55use reth_tasks::TaskExecutor;
56use reth_tracing::tracing::{debug, error, info, warn};
57use reth_transaction_pool::TransactionPool;
58use tokio::sync::{
59 mpsc::{unbounded_channel, UnboundedSender},
60 oneshot, watch,
61};
62
63#[derive(Debug, Clone)]
67pub struct LaunchContext {
68 pub task_executor: TaskExecutor,
70 pub data_dir: ChainPath<DataDirPath>,
72}
73
74impl LaunchContext {
75 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
77 Self { task_executor, data_dir }
78 }
79
80 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
82 LaunchContextWith { inner: self, attachment }
83 }
84
85 pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
90 self,
91 config: NodeConfig<ChainSpec>,
92 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
93 let toml_config = self.load_toml_config(&config)?;
94 Ok(self.with(WithConfigs { config, toml_config }))
95 }
96
97 pub fn load_toml_config<ChainSpec: EthChainSpec>(
102 &self,
103 config: &NodeConfig<ChainSpec>,
104 ) -> eyre::Result<reth_config::Config> {
105 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
106
107 let mut toml_config = reth_config::Config::from_path(&config_path)
108 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
109
110 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
111
112 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
113
114 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
116
117 Ok(toml_config)
118 }
119
120 fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
122 reth_config: &mut reth_config::Config,
123 config: &NodeConfig<ChainSpec>,
124 config_path: impl AsRef<std::path::Path>,
125 ) -> eyre::Result<()> {
126 if reth_config.prune.is_none() {
127 if let Some(prune_config) = config.prune_config() {
128 reth_config.update_prune_config(prune_config);
129 info!(target: "reth::cli", "Saving prune config to toml file");
130 reth_config.save(config_path.as_ref())?;
131 }
132 } else if config.prune_config().is_none() {
133 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
134 }
135 Ok(())
136 }
137
138 pub fn with_configured_globals(self) -> Self {
140 self.configure_globals();
141 self
142 }
143
144 pub fn configure_globals(&self) {
149 match fdlimit::raise_fd_limit() {
152 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
153 debug!(from, to, "Raised file descriptor limit");
154 }
155 Ok(fdlimit::Outcome::Unsupported) => {}
156 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
157 }
158
159 let num_threads =
162 available_parallelism().map_or(0, |num| num.get().saturating_sub(1).max(1));
163 if let Err(err) = ThreadPoolBuilder::new()
164 .num_threads(num_threads)
165 .thread_name(|i| format!("reth-rayon-{i}"))
166 .build_global()
167 {
168 error!(%err, "Failed to build global thread pool")
169 }
170 }
171}
172
173#[derive(Debug, Clone)]
179pub struct LaunchContextWith<T> {
180 pub inner: LaunchContext,
182 pub attachment: T,
184}
185
186impl<T> LaunchContextWith<T> {
187 pub fn configure_globals(&self) {
192 self.inner.configure_globals();
193 }
194
195 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
197 &self.inner.data_dir
198 }
199
200 pub const fn task_executor(&self) -> &TaskExecutor {
202 &self.inner.task_executor
203 }
204
205 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
207 LaunchContextWith {
208 inner: self.inner,
209 attachment: Attached::new(self.attachment, attachment),
210 }
211 }
212
213 pub fn inspect<F>(self, f: F) -> Self
216 where
217 F: FnOnce(&Self),
218 {
219 f(&self);
220 self
221 }
222}
223
224impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
225 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
227 if !self.attachment.config.network.trusted_peers.is_empty() {
228 info!(target: "reth::cli", "Adding trusted nodes");
229
230 self.attachment
231 .toml_config
232 .peers
233 .trusted_nodes
234 .extend(self.attachment.config.network.trusted_peers.clone());
235 }
236 Ok(self)
237 }
238}
239
240impl<L, R> LaunchContextWith<Attached<L, R>> {
241 pub const fn left(&self) -> &L {
243 &self.attachment.left
244 }
245
246 pub const fn right(&self) -> &R {
248 &self.attachment.right
249 }
250
251 pub fn left_mut(&mut self) -> &mut L {
253 &mut self.attachment.left
254 }
255
256 pub fn right_mut(&mut self) -> &mut R {
258 &mut self.attachment.right
259 }
260}
261impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
262 pub fn with_adjusted_configs(self) -> Self {
268 self.ensure_etl_datadir().with_adjusted_instance_ports()
269 }
270
271 pub fn ensure_etl_datadir(mut self) -> Self {
273 if self.toml_config_mut().stages.etl.dir.is_none() {
274 self.toml_config_mut().stages.etl.dir =
275 Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
276 }
277
278 self
279 }
280
281 pub fn with_adjusted_instance_ports(mut self) -> Self {
283 self.node_config_mut().adjust_instance_ports();
284 self
285 }
286
287 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
289 self.attachment.left()
290 }
291
292 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
294 &self.left().config
295 }
296
297 pub fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
299 &mut self.left_mut().config
300 }
301
302 pub const fn toml_config(&self) -> &reth_config::Config {
304 &self.left().toml_config
305 }
306
307 pub fn toml_config_mut(&mut self) -> &mut reth_config::Config {
309 &mut self.left_mut().toml_config
310 }
311
312 pub fn chain_spec(&self) -> Arc<ChainSpec> {
314 self.node_config().chain.clone()
315 }
316
317 pub fn genesis_hash(&self) -> B256 {
319 self.node_config().chain.genesis_hash()
320 }
321
322 pub fn chain_id(&self) -> Chain {
324 self.node_config().chain.chain()
325 }
326
327 pub const fn is_dev(&self) -> bool {
329 self.node_config().dev.dev
330 }
331
332 pub fn prune_config(&self) -> Option<PruneConfig> {
335 let Some(mut node_prune_config) = self.node_config().prune_config() else {
336 return self.toml_config().prune.clone();
338 };
339
340 node_prune_config.merge(self.toml_config().prune.clone());
342 Some(node_prune_config)
343 }
344
345 pub fn prune_modes(&self) -> PruneModes {
347 self.prune_config().map(|config| config.segments).unwrap_or_default()
348 }
349
350 pub fn pruner_builder(&self) -> PrunerBuilder {
352 PrunerBuilder::new(self.prune_config().unwrap_or_default())
353 .delete_limit(self.chain_spec().prune_delete_limit())
354 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
355 }
356
357 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
359 let default_jwt_path = self.data_dir().jwt();
360 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
361 Ok(secret)
362 }
363
364 pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
366 if let Some(interval) = self.node_config().dev.block_time {
367 MiningMode::interval(interval)
368 } else {
369 MiningMode::instant(pool)
370 }
371 }
372}
373
374impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
375where
376 DB: Database + Clone + 'static,
377 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
378{
379 pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
383 where
384 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
385 {
386 let factory = ProviderFactory::new(
387 self.right().clone(),
388 self.chain_spec(),
389 StaticFileProvider::read_write(self.data_dir().static_files())?,
390 )
391 .with_prune_modes(self.prune_modes())
392 .with_static_files_metrics();
393
394 let has_receipt_pruning =
395 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
396
397 if let Some(unwind_target) = factory
400 .static_file_provider()
401 .check_consistency(&factory.provider()?, has_receipt_pruning)?
402 {
403 assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
406
407 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
408
409 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
410
411 let pipeline = PipelineBuilder::default()
413 .add_stages(DefaultStages::new(
414 factory.clone(),
415 tip_rx,
416 Arc::new(NoopConsensus::default()),
417 NoopHeaderDownloader::default(),
418 NoopBodiesDownloader::default(),
419 NoopBlockExecutorProvider::<N::Primitives>::default(),
420 self.toml_config().stages.clone(),
421 self.prune_modes(),
422 ))
423 .build(
424 factory.clone(),
425 StaticFileProducer::new(factory.clone(), self.prune_modes()),
426 );
427
428 let (tx, rx) = oneshot::channel();
430
431 self.task_executor().spawn_critical_blocking(
433 "pipeline task",
434 Box::pin(async move {
435 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
436 let _ = tx.send(result);
437 }),
438 );
439 rx.await??;
440 }
441
442 Ok(factory)
443 }
444
445 pub async fn with_provider_factory<N>(
447 self,
448 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
449 where
450 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
451 {
452 let factory = self.create_provider_factory().await?;
453 let ctx = LaunchContextWith {
454 inner: self.inner,
455 attachment: self.attachment.map_right(|_| factory),
456 };
457
458 Ok(ctx)
459 }
460}
461
462impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
463where
464 T: ProviderNodeTypes,
465{
466 pub const fn database(&self) -> &T::DB {
468 self.right().db_ref()
469 }
470
471 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
473 self.right()
474 }
475
476 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
478 self.right().static_file_provider()
479 }
480
481 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
485 self.start_prometheus_endpoint().await?;
486 Ok(self)
487 }
488
489 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
491 install_prometheus_recorder().spawn_upkeep();
493
494 let listen_addr = self.node_config().metrics;
495 if let Some(addr) = listen_addr {
496 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
497 let config = MetricServerConfig::new(
498 addr,
499 VersionInfo {
500 version: CARGO_PKG_VERSION,
501 build_timestamp: VERGEN_BUILD_TIMESTAMP,
502 cargo_features: VERGEN_CARGO_FEATURES,
503 git_sha: VERGEN_GIT_SHA,
504 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
505 build_profile: BUILD_PROFILE_NAME,
506 },
507 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
508 self.task_executor().clone(),
509 Hooks::builder()
510 .with_hook({
511 let db = self.database().clone();
512 move || db.report_metrics()
513 })
514 .with_hook({
515 let sfp = self.static_file_provider();
516 move || {
517 if let Err(error) = sfp.report_metrics() {
518 error!(%error, "Failed to report metrics for the static file provider");
519 }
520 }
521 })
522 .build(),
523 );
524
525 MetricServer::new(config).serve().await?;
526 }
527
528 Ok(())
529 }
530
531 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
533 init_genesis(self.provider_factory())?;
534 Ok(self)
535 }
536
537 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
539 init_genesis(self.provider_factory())
540 }
541
542 pub fn with_metrics_task(
548 self,
549 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
550 let (metrics_sender, metrics_receiver) = unbounded_channel();
551
552 let with_metrics =
553 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
554
555 debug!(target: "reth::cli", "Spawning stages metrics listener task");
556 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
557 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
558
559 LaunchContextWith {
560 inner: self.inner,
561 attachment: self.attachment.map_right(|_| with_metrics),
562 }
563 }
564}
565
566impl<N, DB>
567 LaunchContextWith<
568 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
569 >
570where
571 N: NodeTypes,
572 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
573{
574 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
576 &self.right().provider_factory
577 }
578
579 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
581 self.right().metrics_sender.clone()
582 }
583
584 #[allow(clippy::complexity)]
586 pub fn with_blockchain_db<T, F>(
587 self,
588 create_blockchain_provider: F,
589 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
590 where
591 T: FullNodeTypes<Types = N, DB = DB>,
592 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
593 {
594 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
595
596 let metered_providers = WithMeteredProviders {
597 db_provider_container: WithMeteredProvider {
598 provider_factory: self.provider_factory().clone(),
599 metrics_sender: self.sync_metrics_tx(),
600 },
601 blockchain_db,
602 };
603
604 let ctx = LaunchContextWith {
605 inner: self.inner,
606 attachment: self.attachment.map_right(|_| metered_providers),
607 };
608
609 Ok(ctx)
610 }
611}
612
613impl<T>
614 LaunchContextWith<
615 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
616 >
617where
618 T: FullNodeTypes<Types: NodeTypesForProvider>,
619{
620 pub const fn database(&self) -> &T::DB {
622 self.provider_factory().db_ref()
623 }
624
625 pub const fn provider_factory(
627 &self,
628 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
629 &self.right().db_provider_container.provider_factory
630 }
631
632 pub fn lookup_head(&self) -> eyre::Result<Head> {
636 self.node_config()
637 .lookup_head(self.provider_factory())
638 .wrap_err("the head block is missing")
639 }
640
641 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
643 self.right().db_provider_container.metrics_sender.clone()
644 }
645
646 pub const fn blockchain_db(&self) -> &T::Provider {
648 &self.right().blockchain_db
649 }
650
651 pub async fn with_components<CB>(
653 self,
654 components_builder: CB,
655 on_component_initialized: Box<
656 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
657 >,
658 ) -> eyre::Result<
659 LaunchContextWith<
660 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
661 >,
662 >
663 where
664 CB: NodeComponentsBuilder<T>,
665 {
666 let head = self.lookup_head()?;
668
669 let builder_ctx = BuilderContext::new(
670 head,
671 self.blockchain_db().clone(),
672 self.task_executor().clone(),
673 self.configs().clone(),
674 );
675
676 debug!(target: "reth::cli", "creating components");
677 let components = components_builder.build_components(&builder_ctx).await?;
678
679 let blockchain_db = self.blockchain_db().clone();
680
681 let node_adapter = NodeAdapter {
682 components,
683 task_executor: self.task_executor().clone(),
684 provider: blockchain_db,
685 };
686
687 debug!(target: "reth::cli", "calling on_component_initialized hook");
688 on_component_initialized.on_event(node_adapter.clone())?;
689
690 let components_container = WithComponents {
691 db_provider_container: WithMeteredProvider {
692 provider_factory: self.provider_factory().clone(),
693 metrics_sender: self.sync_metrics_tx(),
694 },
695 node_adapter,
696 head,
697 };
698
699 let ctx = LaunchContextWith {
700 inner: self.inner,
701 attachment: self.attachment.map_right(|_| components_container),
702 };
703
704 Ok(ctx)
705 }
706}
707
708impl<T, CB>
709 LaunchContextWith<
710 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
711 >
712where
713 T: FullNodeTypes<Types: NodeTypesForProvider>,
714 CB: NodeComponentsBuilder<T>,
715{
716 pub const fn provider_factory(
718 &self,
719 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
720 &self.right().db_provider_container.provider_factory
721 }
722
723 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
726 where
727 C: HeadersClient<Header: BlockHeader>,
728 {
729 self.node_config().max_block(client, self.provider_factory().clone()).await
730 }
731
732 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
734 self.provider_factory().static_file_provider()
735 }
736
737 pub fn static_file_producer(
739 &self,
740 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
741 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
742 }
743
744 pub const fn head(&self) -> Head {
746 self.right().head
747 }
748
749 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
751 &self.right().node_adapter
752 }
753
754 pub fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
756 &mut self.right_mut().node_adapter
757 }
758
759 pub const fn blockchain_db(&self) -> &T::Provider {
761 &self.node_adapter().provider
762 }
763
764 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
770 let mut initial_target = self.node_config().debug.tip;
771
772 if initial_target.is_none() {
773 initial_target = self.check_pipeline_consistency()?;
774 }
775
776 Ok(initial_target)
777 }
778
779 pub const fn terminate_after_initial_backfill(&self) -> bool {
785 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
786 }
787
788 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
793 if self.chain_spec().is_optimism() &&
794 !self.is_dev() &&
795 self.chain_id() == Chain::optimism_mainnet()
796 {
797 let latest = self.blockchain_db().last_block_number()?;
798 if latest < 105235063 {
800 error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended");
801 return Err(ProviderError::BestBlockNotFound)
802 }
803 }
804
805 Ok(())
806 }
807
808 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
820 let first_stage_checkpoint = self
823 .blockchain_db()
824 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
825 .unwrap_or_default()
826 .block_number;
827
828 for stage_id in StageId::ALL.iter().skip(1) {
831 let stage_checkpoint = self
832 .blockchain_db()
833 .get_stage_checkpoint(*stage_id)?
834 .unwrap_or_default()
835 .block_number;
836
837 if stage_checkpoint < first_stage_checkpoint {
840 debug!(
841 target: "consensus::engine",
842 first_stage_checkpoint,
843 inconsistent_stage_id = %stage_id,
844 inconsistent_stage_checkpoint = stage_checkpoint,
845 "Pipeline sync progress is inconsistent"
846 );
847 return self.blockchain_db().block_hash(first_stage_checkpoint);
848 }
849 }
850
851 self.ensure_chain_specific_db_checks()?;
852
853 Ok(None)
854 }
855
856 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
858 self.right().db_provider_container.metrics_sender.clone()
859 }
860
861 pub const fn components(&self) -> &CB::Components {
863 &self.node_adapter().components
864 }
865}
866
867impl<T, CB>
868 LaunchContextWith<
869 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
870 >
871where
872 T: FullNodeTypes<
873 Provider: StateProviderFactory + ChainSpecProvider,
874 Types: NodeTypesForProvider,
875 >,
876 CB: NodeComponentsBuilder<T>,
877{
878 pub fn invalid_block_hook(
880 &self,
881 ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
882 let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
883 return Ok(Box::new(NoopInvalidBlockHook::default()))
884 };
885 let healthy_node_rpc_client = self.get_healthy_node_client()?;
886
887 let output_directory = self.data_dir().invalid_block_hooks();
888 let hooks = hook
889 .iter()
890 .copied()
891 .map(|hook| {
892 let output_directory = output_directory.join(hook.to_string());
893 fs::create_dir_all(&output_directory)?;
894
895 Ok(match hook {
896 InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
897 self.blockchain_db().clone(),
898 self.components().block_executor().clone(),
899 output_directory,
900 healthy_node_rpc_client.clone(),
901 )),
902 InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
903 eyre::bail!("invalid block hook {hook:?} is not implemented yet")
904 }
905 } as Box<dyn InvalidBlockHook<_>>)
906 })
907 .collect::<Result<_, _>>()?;
908
909 Ok(Box::new(InvalidBlockHooks(hooks)))
910 }
911
912 fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
914 self.node_config()
915 .debug
916 .healthy_node_rpc_url
917 .as_ref()
918 .map(|url| {
919 let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
920
921 let chain_id = futures::executor::block_on(async {
923 EthApiClient::<
924 alloy_rpc_types::Transaction,
925 alloy_rpc_types::Block,
926 alloy_rpc_types::Receipt,
927 alloy_rpc_types::Header,
928 >::chain_id(&client)
929 .await
930 })?
931 .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
932 if chain_id.to::<u64>() != self.chain_id().id() {
933 eyre::bail!("invalid chain id for healthy node: {chain_id}")
934 }
935
936 Ok(client)
937 })
938 .transpose()
939 }
940}
941
942#[derive(Clone, Copy, Debug)]
944pub struct Attached<L, R> {
945 left: L,
946 right: R,
947}
948
949impl<L, R> Attached<L, R> {
950 pub const fn new(left: L, right: R) -> Self {
952 Self { left, right }
953 }
954
955 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
957 where
958 F: FnOnce(L) -> T,
959 {
960 Attached::new(f(self.left), self.right)
961 }
962
963 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
965 where
966 F: FnOnce(R) -> T,
967 {
968 Attached::new(self.left, f(self.right))
969 }
970
971 pub const fn left(&self) -> &L {
973 &self.left
974 }
975
976 pub const fn right(&self) -> &R {
978 &self.right
979 }
980
981 pub fn left_mut(&mut self) -> &mut R {
983 &mut self.right
984 }
985
986 pub fn right_mut(&mut self) -> &mut R {
988 &mut self.right
989 }
990}
991
992#[derive(Debug)]
995pub struct WithConfigs<ChainSpec> {
996 pub config: NodeConfig<ChainSpec>,
998 pub toml_config: reth_config::Config,
1000}
1001
1002impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1003 fn clone(&self) -> Self {
1004 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1005 }
1006}
1007
1008#[derive(Debug, Clone)]
1011pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1012 provider_factory: ProviderFactory<N>,
1013 metrics_sender: UnboundedSender<MetricEvent>,
1014}
1015
1016#[allow(missing_debug_implementations)]
1019pub struct WithMeteredProviders<T>
1020where
1021 T: FullNodeTypes,
1022{
1023 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1024 blockchain_db: T::Provider,
1025}
1026
1027#[allow(missing_debug_implementations)]
1029pub struct WithComponents<T, CB>
1030where
1031 T: FullNodeTypes,
1032 CB: NodeComponentsBuilder<T>,
1033{
1034 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1035 node_adapter: NodeAdapter<T, CB::Components>,
1036 head: Head,
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use super::{LaunchContext, NodeConfig};
1042 use reth_config::Config;
1043 use reth_node_core::args::PruningArgs;
1044
1045 const EXTENSION: &str = "toml";
1046
1047 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1048 let temp_dir = tempfile::tempdir().unwrap();
1049 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1050 proc(&config_path);
1051 temp_dir.close().unwrap()
1052 }
1053
1054 #[test]
1055 fn test_save_prune_config() {
1056 with_tempdir("prune-store-test", |config_path| {
1057 let mut reth_config = Config::default();
1058 let node_config = NodeConfig {
1059 pruning: PruningArgs {
1060 full: true,
1061 block_interval: None,
1062 sender_recovery_full: false,
1063 sender_recovery_distance: None,
1064 sender_recovery_before: None,
1065 transaction_lookup_full: false,
1066 transaction_lookup_distance: None,
1067 transaction_lookup_before: None,
1068 receipts_full: false,
1069 receipts_distance: None,
1070 receipts_before: None,
1071 account_history_full: false,
1072 account_history_distance: None,
1073 account_history_before: None,
1074 storage_history_full: false,
1075 storage_history_distance: None,
1076 storage_history_before: None,
1077 receipts_log_filter: vec![],
1078 },
1079 ..NodeConfig::test()
1080 };
1081 LaunchContext::save_pruning_config_if_full_node(
1082 &mut reth_config,
1083 &node_config,
1084 config_path,
1085 )
1086 .unwrap();
1087
1088 let loaded_config = Config::from_path(config_path).unwrap();
1089
1090 assert_eq!(reth_config, loaded_config);
1091 })
1092 }
1093}