1use crate::{
4 components::{NodeComponents, NodeComponentsBuilder},
5 hooks::OnComponentInitializedHook,
6 BuilderContext, NodeAdapter,
7};
8use alloy_eips::eip2124::Head;
9use alloy_primitives::{BlockNumber, B256};
10use eyre::{Context, OptionExt};
11use rayon::ThreadPoolBuilder;
12use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
13use reth_config::{config::EtlConfig, PruneConfig};
14use reth_consensus::noop::NoopConsensus;
15use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
16use reth_db_common::init::{init_genesis, InitStorageError};
17use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
18use reth_engine_local::MiningMode;
19use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
20use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
21use reth_fs_util as fs;
22use reth_invalid_block_hooks::InvalidBlockWitnessHook;
23use reth_network_p2p::headers::client::HeadersClient;
24use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
25use reth_node_core::{
26 args::InvalidBlockHookType,
27 dirs::{ChainPath, DataDirPath},
28 node_config::NodeConfig,
29 primitives::BlockHeader,
30 version::{
31 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
32 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
33 },
34};
35use reth_node_metrics::{
36 chain::ChainSpecInfo,
37 hooks::Hooks,
38 recorder::install_prometheus_recorder,
39 server::{MetricServer, MetricServerConfig},
40 version::VersionInfo,
41};
42use reth_provider::{
43 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
44 BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
45 ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
46};
47use reth_prune::{PruneModes, PrunerBuilder};
48use reth_rpc_api::clients::EthApiClient;
49use reth_rpc_builder::config::RethRpcServerConfig;
50use reth_rpc_layer::JwtSecret;
51use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
52use reth_static_file::StaticFileProducer;
53use reth_tasks::TaskExecutor;
54use reth_tracing::tracing::{debug, error, info, warn};
55use reth_transaction_pool::TransactionPool;
56use std::{sync::Arc, thread::available_parallelism};
57use tokio::sync::{
58 mpsc::{unbounded_channel, UnboundedSender},
59 oneshot, watch,
60};
61
62#[derive(Debug, Clone)]
66pub struct LaunchContext {
67 pub task_executor: TaskExecutor,
69 pub data_dir: ChainPath<DataDirPath>,
71}
72
73impl LaunchContext {
74 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
76 Self { task_executor, data_dir }
77 }
78
79 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
81 LaunchContextWith { inner: self, attachment }
82 }
83
84 pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
89 self,
90 config: NodeConfig<ChainSpec>,
91 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
92 let toml_config = self.load_toml_config(&config)?;
93 Ok(self.with(WithConfigs { config, toml_config }))
94 }
95
96 pub fn load_toml_config<ChainSpec: EthChainSpec>(
101 &self,
102 config: &NodeConfig<ChainSpec>,
103 ) -> eyre::Result<reth_config::Config> {
104 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
105
106 let mut toml_config = reth_config::Config::from_path(&config_path)
107 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
108
109 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
110
111 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
112
113 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
115
116 Ok(toml_config)
117 }
118
119 fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
121 reth_config: &mut reth_config::Config,
122 config: &NodeConfig<ChainSpec>,
123 config_path: impl AsRef<std::path::Path>,
124 ) -> eyre::Result<()> {
125 if reth_config.prune.is_none() {
126 if let Some(prune_config) = config.prune_config() {
127 reth_config.update_prune_config(prune_config);
128 info!(target: "reth::cli", "Saving prune config to toml file");
129 reth_config.save(config_path.as_ref())?;
130 }
131 } else if config.prune_config().is_none() {
132 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
133 }
134 Ok(())
135 }
136
137 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
139 self.configure_globals(reserved_cpu_cores);
140 self
141 }
142
143 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
150 match fdlimit::raise_fd_limit() {
153 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
154 debug!(from, to, "Raised file descriptor limit");
155 }
156 Ok(fdlimit::Outcome::Unsupported) => {}
157 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
158 }
159
160 let num_threads = available_parallelism()
164 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
165 if let Err(err) = ThreadPoolBuilder::new()
166 .num_threads(num_threads)
167 .thread_name(|i| format!("reth-rayon-{i}"))
168 .build_global()
169 {
170 error!(%err, "Failed to build global thread pool")
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
181pub struct LaunchContextWith<T> {
182 pub inner: LaunchContext,
184 pub attachment: T,
186}
187
188impl<T> LaunchContextWith<T> {
189 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
194 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
195 }
196
197 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
199 &self.inner.data_dir
200 }
201
202 pub const fn task_executor(&self) -> &TaskExecutor {
204 &self.inner.task_executor
205 }
206
207 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
209 LaunchContextWith {
210 inner: self.inner,
211 attachment: Attached::new(self.attachment, attachment),
212 }
213 }
214
215 pub fn inspect<F>(self, f: F) -> Self
218 where
219 F: FnOnce(&Self),
220 {
221 f(&self);
222 self
223 }
224}
225
226impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
227 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
229 if !self.attachment.config.network.trusted_peers.is_empty() {
230 info!(target: "reth::cli", "Adding trusted nodes");
231
232 self.attachment
233 .toml_config
234 .peers
235 .trusted_nodes
236 .extend(self.attachment.config.network.trusted_peers.clone());
237 }
238 Ok(self)
239 }
240}
241
242impl<L, R> LaunchContextWith<Attached<L, R>> {
243 pub const fn left(&self) -> &L {
245 &self.attachment.left
246 }
247
248 pub const fn right(&self) -> &R {
250 &self.attachment.right
251 }
252
253 pub const fn left_mut(&mut self) -> &mut L {
255 &mut self.attachment.left
256 }
257
258 pub const fn right_mut(&mut self) -> &mut R {
260 &mut self.attachment.right
261 }
262}
263impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
264 pub fn with_adjusted_configs(self) -> Self {
270 self.ensure_etl_datadir().with_adjusted_instance_ports()
271 }
272
273 pub fn ensure_etl_datadir(mut self) -> Self {
275 if self.toml_config_mut().stages.etl.dir.is_none() {
276 let etl_path = EtlConfig::from_datadir(self.data_dir().data_dir());
277 if etl_path.exists() {
278 if let Err(err) = fs::remove_dir_all(&etl_path) {
280 warn!(target: "reth::cli", ?etl_path, %err, "Failed to remove ETL path on launch");
281 }
282 }
283 self.toml_config_mut().stages.etl.dir = Some(etl_path);
284 }
285
286 self
287 }
288
289 pub fn with_adjusted_instance_ports(mut self) -> Self {
291 self.node_config_mut().adjust_instance_ports();
292 self
293 }
294
295 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
297 self.attachment.left()
298 }
299
300 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
302 &self.left().config
303 }
304
305 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
307 &mut self.left_mut().config
308 }
309
310 pub const fn toml_config(&self) -> &reth_config::Config {
312 &self.left().toml_config
313 }
314
315 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
317 &mut self.left_mut().toml_config
318 }
319
320 pub fn chain_spec(&self) -> Arc<ChainSpec> {
322 self.node_config().chain.clone()
323 }
324
325 pub fn genesis_hash(&self) -> B256 {
327 self.node_config().chain.genesis_hash()
328 }
329
330 pub fn chain_id(&self) -> Chain {
332 self.node_config().chain.chain()
333 }
334
335 pub const fn is_dev(&self) -> bool {
337 self.node_config().dev.dev
338 }
339
340 pub fn prune_config(&self) -> Option<PruneConfig> {
344 let Some(mut node_prune_config) = self.node_config().prune_config() else {
345 return self.toml_config().prune.clone();
347 };
348
349 node_prune_config.merge(self.toml_config().prune.clone());
351 Some(node_prune_config)
352 }
353
354 pub fn prune_modes(&self) -> PruneModes {
356 self.prune_config().map(|config| config.segments).unwrap_or_default()
357 }
358
359 pub fn pruner_builder(&self) -> PrunerBuilder {
361 PrunerBuilder::new(self.prune_config().unwrap_or_default())
362 .delete_limit(self.chain_spec().prune_delete_limit())
363 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
364 }
365
366 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
368 let default_jwt_path = self.data_dir().jwt();
369 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
370 Ok(secret)
371 }
372
373 pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
375 if let Some(interval) = self.node_config().dev.block_time {
376 MiningMode::interval(interval)
377 } else {
378 MiningMode::instant(pool)
379 }
380 }
381}
382
383impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
384where
385 DB: Database + Clone + 'static,
386 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
387{
388 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
392 where
393 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
394 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
395 {
396 let factory = ProviderFactory::new(
397 self.right().clone(),
398 self.chain_spec(),
399 StaticFileProvider::read_write(self.data_dir().static_files())?,
400 )
401 .with_prune_modes(self.prune_modes())
402 .with_static_files_metrics();
403
404 let has_receipt_pruning =
405 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
406
407 if let Some(unwind_target) = factory
410 .static_file_provider()
411 .check_consistency(&factory.provider()?, has_receipt_pruning)?
412 {
413 assert_ne!(
416 unwind_target,
417 PipelineTarget::Unwind(0),
418 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
419 );
420
421 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
422
423 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
424
425 let pipeline = PipelineBuilder::default()
427 .add_stages(DefaultStages::new(
428 factory.clone(),
429 tip_rx,
430 Arc::new(NoopConsensus::default()),
431 NoopHeaderDownloader::default(),
432 NoopBodiesDownloader::default(),
433 NoopEvmConfig::<Evm>::default(),
434 self.toml_config().stages.clone(),
435 self.prune_modes(),
436 None,
437 ))
438 .build(
439 factory.clone(),
440 StaticFileProducer::new(factory.clone(), self.prune_modes()),
441 );
442
443 let (tx, rx) = oneshot::channel();
445
446 self.task_executor().spawn_critical_blocking(
448 "pipeline task",
449 Box::pin(async move {
450 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
451 let _ = tx.send(result);
452 }),
453 );
454 rx.await?.inspect_err(|err| {
455 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
456 })?;
457 }
458
459 Ok(factory)
460 }
461
462 pub async fn with_provider_factory<N, Evm>(
464 self,
465 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
466 where
467 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
468 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
469 {
470 let factory = self.create_provider_factory::<N, Evm>().await?;
471 let ctx = LaunchContextWith {
472 inner: self.inner,
473 attachment: self.attachment.map_right(|_| factory),
474 };
475
476 Ok(ctx)
477 }
478}
479
480impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
481where
482 T: ProviderNodeTypes,
483{
484 pub const fn database(&self) -> &T::DB {
486 self.right().db_ref()
487 }
488
489 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
491 self.right()
492 }
493
494 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
496 self.right().static_file_provider()
497 }
498
499 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
503 self.start_prometheus_endpoint().await?;
504 Ok(self)
505 }
506
507 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
509 install_prometheus_recorder().spawn_upkeep();
511
512 let listen_addr = self.node_config().metrics;
513 if let Some(addr) = listen_addr {
514 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
515 let config = MetricServerConfig::new(
516 addr,
517 VersionInfo {
518 version: CARGO_PKG_VERSION,
519 build_timestamp: VERGEN_BUILD_TIMESTAMP,
520 cargo_features: VERGEN_CARGO_FEATURES,
521 git_sha: VERGEN_GIT_SHA,
522 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
523 build_profile: BUILD_PROFILE_NAME,
524 },
525 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
526 self.task_executor().clone(),
527 Hooks::builder()
528 .with_hook({
529 let db = self.database().clone();
530 move || db.report_metrics()
531 })
532 .with_hook({
533 let sfp = self.static_file_provider();
534 move || {
535 if let Err(error) = sfp.report_metrics() {
536 error!(%error, "Failed to report metrics for the static file provider");
537 }
538 }
539 })
540 .build(),
541 );
542
543 MetricServer::new(config).serve().await?;
544 }
545
546 Ok(())
547 }
548
549 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
551 init_genesis(self.provider_factory())?;
552 Ok(self)
553 }
554
555 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
557 init_genesis(self.provider_factory())
558 }
559
560 pub fn with_metrics_task(
566 self,
567 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
568 let (metrics_sender, metrics_receiver) = unbounded_channel();
569
570 let with_metrics =
571 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
572
573 debug!(target: "reth::cli", "Spawning stages metrics listener task");
574 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
575 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
576
577 LaunchContextWith {
578 inner: self.inner,
579 attachment: self.attachment.map_right(|_| with_metrics),
580 }
581 }
582}
583
584impl<N, DB>
585 LaunchContextWith<
586 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
587 >
588where
589 N: NodeTypes,
590 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
591{
592 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
594 &self.right().provider_factory
595 }
596
597 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
599 self.right().metrics_sender.clone()
600 }
601
602 #[expect(clippy::complexity)]
604 pub fn with_blockchain_db<T, F>(
605 self,
606 create_blockchain_provider: F,
607 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
608 where
609 T: FullNodeTypes<Types = N, DB = DB>,
610 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
611 {
612 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
613
614 let metered_providers = WithMeteredProviders {
615 db_provider_container: WithMeteredProvider {
616 provider_factory: self.provider_factory().clone(),
617 metrics_sender: self.sync_metrics_tx(),
618 },
619 blockchain_db,
620 };
621
622 let ctx = LaunchContextWith {
623 inner: self.inner,
624 attachment: self.attachment.map_right(|_| metered_providers),
625 };
626
627 Ok(ctx)
628 }
629}
630
631impl<T>
632 LaunchContextWith<
633 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
634 >
635where
636 T: FullNodeTypes<Types: NodeTypesForProvider>,
637{
638 pub const fn database(&self) -> &T::DB {
640 self.provider_factory().db_ref()
641 }
642
643 pub const fn provider_factory(
645 &self,
646 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
647 &self.right().db_provider_container.provider_factory
648 }
649
650 pub fn lookup_head(&self) -> eyre::Result<Head> {
654 self.node_config()
655 .lookup_head(self.provider_factory())
656 .wrap_err("the head block is missing")
657 }
658
659 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
661 self.right().db_provider_container.metrics_sender.clone()
662 }
663
664 pub const fn blockchain_db(&self) -> &T::Provider {
666 &self.right().blockchain_db
667 }
668
669 pub async fn with_components<CB>(
671 self,
672 components_builder: CB,
673 on_component_initialized: Box<
674 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
675 >,
676 ) -> eyre::Result<
677 LaunchContextWith<
678 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
679 >,
680 >
681 where
682 CB: NodeComponentsBuilder<T>,
683 {
684 let head = self.lookup_head()?;
686
687 let builder_ctx = BuilderContext::new(
688 head,
689 self.blockchain_db().clone(),
690 self.task_executor().clone(),
691 self.configs().clone(),
692 );
693
694 debug!(target: "reth::cli", "creating components");
695 let components = components_builder.build_components(&builder_ctx).await?;
696
697 let blockchain_db = self.blockchain_db().clone();
698
699 let node_adapter = NodeAdapter {
700 components,
701 task_executor: self.task_executor().clone(),
702 provider: blockchain_db,
703 };
704
705 debug!(target: "reth::cli", "calling on_component_initialized hook");
706 on_component_initialized.on_event(node_adapter.clone())?;
707
708 let components_container = WithComponents {
709 db_provider_container: WithMeteredProvider {
710 provider_factory: self.provider_factory().clone(),
711 metrics_sender: self.sync_metrics_tx(),
712 },
713 node_adapter,
714 head,
715 };
716
717 let ctx = LaunchContextWith {
718 inner: self.inner,
719 attachment: self.attachment.map_right(|_| components_container),
720 };
721
722 Ok(ctx)
723 }
724}
725
726impl<T, CB>
727 LaunchContextWith<
728 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
729 >
730where
731 T: FullNodeTypes<Types: NodeTypesForProvider>,
732 CB: NodeComponentsBuilder<T>,
733{
734 pub const fn provider_factory(
736 &self,
737 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
738 &self.right().db_provider_container.provider_factory
739 }
740
741 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
744 where
745 C: HeadersClient<Header: BlockHeader>,
746 {
747 self.node_config().max_block(client, self.provider_factory().clone()).await
748 }
749
750 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
752 self.provider_factory().static_file_provider()
753 }
754
755 pub fn static_file_producer(
757 &self,
758 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
759 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
760 }
761
762 pub const fn head(&self) -> Head {
764 self.right().head
765 }
766
767 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
769 &self.right().node_adapter
770 }
771
772 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
774 &mut self.right_mut().node_adapter
775 }
776
777 pub const fn blockchain_db(&self) -> &T::Provider {
779 &self.node_adapter().provider
780 }
781
782 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
788 let mut initial_target = self.node_config().debug.tip;
789
790 if initial_target.is_none() {
791 initial_target = self.check_pipeline_consistency()?;
792 }
793
794 Ok(initial_target)
795 }
796
797 pub const fn terminate_after_initial_backfill(&self) -> bool {
803 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
804 }
805
806 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
811 if self.chain_spec().is_optimism() &&
812 !self.is_dev() &&
813 self.chain_id() == Chain::optimism_mainnet()
814 {
815 let latest = self.blockchain_db().last_block_number()?;
816 if latest < 105235063 {
818 error!(
819 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
820 );
821 return Err(ProviderError::BestBlockNotFound)
822 }
823 }
824
825 Ok(())
826 }
827
828 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
840 let first_stage_checkpoint = self
843 .blockchain_db()
844 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
845 .unwrap_or_default()
846 .block_number;
847
848 for stage_id in StageId::ALL.iter().skip(1) {
851 let stage_checkpoint = self
852 .blockchain_db()
853 .get_stage_checkpoint(*stage_id)?
854 .unwrap_or_default()
855 .block_number;
856
857 if stage_checkpoint < first_stage_checkpoint {
860 debug!(
861 target: "consensus::engine",
862 first_stage_checkpoint,
863 inconsistent_stage_id = %stage_id,
864 inconsistent_stage_checkpoint = stage_checkpoint,
865 "Pipeline sync progress is inconsistent"
866 );
867 return self.blockchain_db().block_hash(first_stage_checkpoint);
868 }
869 }
870
871 self.ensure_chain_specific_db_checks()?;
872
873 Ok(None)
874 }
875
876 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
878 self.right().db_provider_container.metrics_sender.clone()
879 }
880
881 pub const fn components(&self) -> &CB::Components {
883 &self.node_adapter().components
884 }
885}
886
887impl<T, CB>
888 LaunchContextWith<
889 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
890 >
891where
892 T: FullNodeTypes<
893 Provider: StateProviderFactory + ChainSpecProvider,
894 Types: NodeTypesForProvider,
895 >,
896 CB: NodeComponentsBuilder<T>,
897{
898 pub fn invalid_block_hook(
900 &self,
901 ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
902 let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
903 return Ok(Box::new(NoopInvalidBlockHook::default()))
904 };
905 let healthy_node_rpc_client = self.get_healthy_node_client()?;
906
907 let output_directory = self.data_dir().invalid_block_hooks();
908 let hooks = hook
909 .iter()
910 .copied()
911 .map(|hook| {
912 let output_directory = output_directory.join(hook.to_string());
913 fs::create_dir_all(&output_directory)?;
914
915 Ok(match hook {
916 InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
917 self.blockchain_db().clone(),
918 self.components().evm_config().clone(),
919 output_directory,
920 healthy_node_rpc_client.clone(),
921 )),
922 InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
923 eyre::bail!("invalid block hook {hook:?} is not implemented yet")
924 }
925 } as Box<dyn InvalidBlockHook<_>>)
926 })
927 .collect::<Result<_, _>>()?;
928
929 Ok(Box::new(InvalidBlockHooks(hooks)))
930 }
931
932 fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
934 self.node_config()
935 .debug
936 .healthy_node_rpc_url
937 .as_ref()
938 .map(|url| {
939 let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
940
941 let chain_id = futures::executor::block_on(async {
943 EthApiClient::<
944 alloy_rpc_types::Transaction,
945 alloy_rpc_types::Block,
946 alloy_rpc_types::Receipt,
947 alloy_rpc_types::Header,
948 >::chain_id(&client)
949 .await
950 })?
951 .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
952 if chain_id.to::<u64>() != self.chain_id().id() {
953 eyre::bail!("invalid chain id for healthy node: {chain_id}")
954 }
955
956 Ok(client)
957 })
958 .transpose()
959 }
960}
961
962#[derive(Clone, Copy, Debug)]
964pub struct Attached<L, R> {
965 left: L,
966 right: R,
967}
968
969impl<L, R> Attached<L, R> {
970 pub const fn new(left: L, right: R) -> Self {
972 Self { left, right }
973 }
974
975 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
977 where
978 F: FnOnce(L) -> T,
979 {
980 Attached::new(f(self.left), self.right)
981 }
982
983 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
985 where
986 F: FnOnce(R) -> T,
987 {
988 Attached::new(self.left, f(self.right))
989 }
990
991 pub const fn left(&self) -> &L {
993 &self.left
994 }
995
996 pub const fn right(&self) -> &R {
998 &self.right
999 }
1000
1001 pub const fn left_mut(&mut self) -> &mut R {
1003 &mut self.right
1004 }
1005
1006 pub const fn right_mut(&mut self) -> &mut R {
1008 &mut self.right
1009 }
1010}
1011
1012#[derive(Debug)]
1015pub struct WithConfigs<ChainSpec> {
1016 pub config: NodeConfig<ChainSpec>,
1018 pub toml_config: reth_config::Config,
1020}
1021
1022impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1023 fn clone(&self) -> Self {
1024 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1025 }
1026}
1027
1028#[derive(Debug, Clone)]
1031pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1032 provider_factory: ProviderFactory<N>,
1033 metrics_sender: UnboundedSender<MetricEvent>,
1034}
1035
1036#[expect(missing_debug_implementations)]
1039pub struct WithMeteredProviders<T>
1040where
1041 T: FullNodeTypes,
1042{
1043 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1044 blockchain_db: T::Provider,
1045}
1046
1047#[expect(missing_debug_implementations)]
1049pub struct WithComponents<T, CB>
1050where
1051 T: FullNodeTypes,
1052 CB: NodeComponentsBuilder<T>,
1053{
1054 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1055 node_adapter: NodeAdapter<T, CB::Components>,
1056 head: Head,
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use super::{LaunchContext, NodeConfig};
1062 use reth_config::Config;
1063 use reth_node_core::args::PruningArgs;
1064
1065 const EXTENSION: &str = "toml";
1066
1067 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1068 let temp_dir = tempfile::tempdir().unwrap();
1069 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1070 proc(&config_path);
1071 temp_dir.close().unwrap()
1072 }
1073
1074 #[test]
1075 fn test_save_prune_config() {
1076 with_tempdir("prune-store-test", |config_path| {
1077 let mut reth_config = Config::default();
1078 let node_config = NodeConfig {
1079 pruning: PruningArgs {
1080 full: true,
1081 block_interval: None,
1082 sender_recovery_full: false,
1083 sender_recovery_distance: None,
1084 sender_recovery_before: None,
1085 transaction_lookup_full: false,
1086 transaction_lookup_distance: None,
1087 transaction_lookup_before: None,
1088 receipts_full: false,
1089 receipts_distance: None,
1090 receipts_before: None,
1091 account_history_full: false,
1092 account_history_distance: None,
1093 account_history_before: None,
1094 storage_history_full: false,
1095 storage_history_distance: None,
1096 storage_history_before: None,
1097 receipts_log_filter: None,
1098 },
1099 ..NodeConfig::test()
1100 };
1101 LaunchContext::save_pruning_config_if_full_node(
1102 &mut reth_config,
1103 &node_config,
1104 config_path,
1105 )
1106 .unwrap();
1107
1108 let loaded_config = Config::from_path(config_path).unwrap();
1109
1110 assert_eq!(reth_config, loaded_config);
1111 })
1112 }
1113}