use std::{sync::Arc, thread::available_parallelism};
use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::OnComponentInitializedHook,
BuilderContext, NodeAdapter,
};
use alloy_primitives::{BlockNumber, B256};
use eyre::{Context, OptionExt};
use rayon::ThreadPoolBuilder;
use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
use reth_config::{config::EtlConfig, PruneConfig};
use reth_consensus::noop::NoopConsensus;
use reth_db_api::{
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetrics},
};
use reth_db_common::init::{init_genesis, InitStorageError};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_engine_local::MiningMode;
use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
use reth_evm::noop::NoopBlockExecutorProvider;
use reth_fs_util as fs;
use reth_invalid_block_hooks::InvalidBlockWitnessHook;
use reth_network_p2p::headers::client::HeadersClient;
use reth_node_api::{
FullNodePrimitives, FullNodeTypes, NodePrimitives, NodeTypes, NodeTypesWithDB,
NodeTypesWithDBAdapter,
};
use reth_node_core::{
args::InvalidBlockHookType,
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
primitives::BlockHeader,
version::{
BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
},
};
use reth_node_metrics::{
chain::ChainSpecInfo,
hooks::Hooks,
recorder::install_prometheus_recorder,
server::{MetricServer, MetricServerConfig},
version::VersionInfo,
};
use reth_primitives::{Head, TransactionSigned};
use reth_provider::{
providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_api::clients::EthApiClient;
use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
use reth_transaction_pool::TransactionPool;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot, watch,
};
#[derive(Debug, Clone)]
pub struct LaunchContext {
pub task_executor: TaskExecutor,
pub data_dir: ChainPath<DataDirPath>,
}
impl LaunchContext {
pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
Self { task_executor, data_dir }
}
pub const fn with<DB>(self, database: DB) -> LaunchContextWith<DB> {
LaunchContextWith { inner: self, attachment: database }
}
pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
self,
config: NodeConfig<ChainSpec>,
) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
let toml_config = self.load_toml_config(&config)?;
Ok(self.with(WithConfigs { config, toml_config }))
}
pub fn load_toml_config<ChainSpec: EthChainSpec>(
&self,
config: &NodeConfig<ChainSpec>,
) -> eyre::Result<reth_config::Config> {
let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
let mut toml_config = reth_config::Config::from_path(&config_path)
.wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
toml_config.peers.trusted_nodes_only = config.network.trusted_only;
Ok(toml_config)
}
fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
reth_config: &mut reth_config::Config,
config: &NodeConfig<ChainSpec>,
config_path: impl AsRef<std::path::Path>,
) -> eyre::Result<()> {
if reth_config.prune.is_none() {
if let Some(prune_config) = config.prune_config() {
reth_config.update_prune_config(prune_config);
info!(target: "reth::cli", "Saving prune config to toml file");
reth_config.save(config_path.as_ref())?;
}
} else if config.prune_config().is_none() {
warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
}
Ok(())
}
pub fn with_configured_globals(self) -> Self {
self.configure_globals();
self
}
pub fn configure_globals(&self) {
match fdlimit::raise_fd_limit() {
Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
debug!(from, to, "Raised file descriptor limit");
}
Ok(fdlimit::Outcome::Unsupported) => {}
Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
}
let num_threads =
available_parallelism().map_or(0, |num| num.get().saturating_sub(1).max(1));
if let Err(err) = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("reth-rayon-{i}"))
.build_global()
{
error!(%err, "Failed to build global thread pool")
}
}
}
#[derive(Debug, Clone)]
pub struct LaunchContextWith<T> {
pub inner: LaunchContext,
pub attachment: T,
}
impl<T> LaunchContextWith<T> {
pub fn configure_globals(&self) {
self.inner.configure_globals();
}
pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
&self.inner.data_dir
}
pub const fn task_executor(&self) -> &TaskExecutor {
&self.inner.task_executor
}
pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
LaunchContextWith {
inner: self.inner,
attachment: Attached::new(self.attachment, attachment),
}
}
pub fn inspect<F>(self, f: F) -> Self
where
F: FnOnce(&Self),
{
f(&self);
self
}
}
impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
pub async fn with_resolved_peers(mut self) -> eyre::Result<Self> {
if !self.attachment.config.network.trusted_peers.is_empty() {
info!(target: "reth::cli", "Adding trusted nodes");
self.attachment
.toml_config
.peers
.trusted_nodes
.extend(self.attachment.config.network.trusted_peers.clone());
}
Ok(self)
}
}
impl<L, R> LaunchContextWith<Attached<L, R>> {
pub const fn left(&self) -> &L {
&self.attachment.left
}
pub const fn right(&self) -> &R {
&self.attachment.right
}
pub fn left_mut(&mut self) -> &mut L {
&mut self.attachment.left
}
pub fn right_mut(&mut self) -> &mut R {
&mut self.attachment.right
}
}
impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
pub fn with_adjusted_configs(self) -> Self {
self.ensure_etl_datadir().with_adjusted_instance_ports()
}
pub fn ensure_etl_datadir(mut self) -> Self {
if self.toml_config_mut().stages.etl.dir.is_none() {
self.toml_config_mut().stages.etl.dir =
Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
}
self
}
pub fn with_adjusted_instance_ports(mut self) -> Self {
self.node_config_mut().adjust_instance_ports();
self
}
pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
self.attachment.left()
}
pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
&self.left().config
}
pub fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
&mut self.left_mut().config
}
pub const fn toml_config(&self) -> &reth_config::Config {
&self.left().toml_config
}
pub fn toml_config_mut(&mut self) -> &mut reth_config::Config {
&mut self.left_mut().toml_config
}
pub fn chain_spec(&self) -> Arc<ChainSpec> {
self.node_config().chain.clone()
}
pub fn genesis_hash(&self) -> B256 {
self.node_config().chain.genesis_hash()
}
pub fn chain_id(&self) -> Chain {
self.node_config().chain.chain()
}
pub const fn is_dev(&self) -> bool {
self.node_config().dev.dev
}
pub fn prune_config(&self) -> Option<PruneConfig> {
let Some(mut node_prune_config) = self.node_config().prune_config() else {
return self.toml_config().prune.clone();
};
node_prune_config.merge(self.toml_config().prune.clone());
Some(node_prune_config)
}
pub fn prune_modes(&self) -> PruneModes {
self.prune_config().map(|config| config.segments).unwrap_or_default()
}
pub fn pruner_builder(&self) -> PrunerBuilder {
PrunerBuilder::new(self.prune_config().unwrap_or_default())
.delete_limit(self.chain_spec().prune_delete_limit())
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
}
pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
let default_jwt_path = self.data_dir().jwt();
let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
Ok(secret)
}
pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
if let Some(interval) = self.node_config().dev.block_time {
MiningMode::interval(interval)
} else {
MiningMode::instant(pool)
}
}
}
impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
where
DB: Database + Clone + 'static,
ChainSpec: EthChainSpec + EthereumHardforks + 'static,
{
pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
{
let factory = ProviderFactory::new(
self.right().clone(),
self.chain_spec(),
StaticFileProvider::read_write(self.data_dir().static_files())?,
)
.with_prune_modes(self.prune_modes())
.with_static_files_metrics();
let has_receipt_pruning =
self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
if let Some(unwind_target) = factory
.static_file_provider()
.check_consistency(&factory.provider()?, has_receipt_pruning)?
{
assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
let pipeline = PipelineBuilder::default()
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,
Arc::new(NoopConsensus::default()),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
NoopBlockExecutorProvider::<N::Primitives>::default(),
self.toml_config().stages.clone(),
self.prune_modes(),
))
.build(
factory.clone(),
StaticFileProducer::new(factory.clone(), self.prune_modes()),
);
let (tx, rx) = oneshot::channel();
self.task_executor().spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
let _ = tx.send(result);
}),
);
rx.await??;
}
Ok(factory)
}
pub async fn with_provider_factory<N>(
self,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
where
N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
{
let factory = self.create_provider_factory().await?;
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| factory),
};
Ok(ctx)
}
}
impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
where
T: ProviderNodeTypes,
{
pub const fn database(&self) -> &T::DB {
self.right().db_ref()
}
pub const fn provider_factory(&self) -> &ProviderFactory<T> {
self.right()
}
pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
self.right().static_file_provider()
}
pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
self.start_prometheus_endpoint().await?;
Ok(self)
}
pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
install_prometheus_recorder().spawn_upkeep();
let listen_addr = self.node_config().metrics;
if let Some(addr) = listen_addr {
info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
let config = MetricServerConfig::new(
addr,
VersionInfo {
version: CARGO_PKG_VERSION,
build_timestamp: VERGEN_BUILD_TIMESTAMP,
cargo_features: VERGEN_CARGO_FEATURES,
git_sha: VERGEN_GIT_SHA,
target_triple: VERGEN_CARGO_TARGET_TRIPLE,
build_profile: BUILD_PROFILE_NAME,
},
ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
self.task_executor().clone(),
Hooks::builder()
.with_hook({
let db = self.database().clone();
move || db.report_metrics()
})
.with_hook({
let sfp = self.static_file_provider();
move || {
if let Err(error) = sfp.report_metrics() {
error!(%error, "Failed to report metrics for the static file provider");
}
}
})
.build(),
);
MetricServer::new(config).serve().await?;
}
Ok(())
}
pub fn with_genesis(self) -> Result<Self, InitStorageError> {
init_genesis(self.provider_factory())?;
Ok(self)
}
pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
init_genesis(self.provider_factory())
}
pub fn with_metrics_task(
self,
) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
let (metrics_sender, metrics_receiver) = unbounded_channel();
let with_metrics =
WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
debug!(target: "reth::cli", "Spawning stages metrics listener task");
let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| with_metrics),
}
}
}
impl<N, DB>
LaunchContextWith<
Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
>
where
N: NodeTypes,
DB: Database + DatabaseMetrics + DatabaseMetadata + Clone + Unpin + 'static,
{
const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
&self.right().provider_factory
}
fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().metrics_sender.clone()
}
#[allow(clippy::complexity)]
pub fn with_blockchain_db<T, F>(
self,
create_blockchain_provider: F,
) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
where
T: FullNodeTypes<Types = N, DB = DB>,
F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
{
let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
let metered_providers = WithMeteredProviders {
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
blockchain_db,
};
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| metered_providers),
};
Ok(ctx)
}
}
impl<T>
LaunchContextWith<
Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
>
where
T: FullNodeTypes<Types: NodeTypesForProvider>,
{
pub const fn database(&self) -> &T::DB {
self.provider_factory().db_ref()
}
pub const fn provider_factory(
&self,
) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
&self.right().db_provider_container.provider_factory
}
pub fn lookup_head(&self) -> eyre::Result<Head> {
self.node_config()
.lookup_head(self.provider_factory())
.wrap_err("the head block is missing")
}
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}
pub const fn blockchain_db(&self) -> &T::Provider {
&self.right().blockchain_db
}
pub async fn with_components<CB>(
self,
components_builder: CB,
on_component_initialized: Box<
dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
>,
) -> eyre::Result<
LaunchContextWith<
Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
>,
>
where
CB: NodeComponentsBuilder<T>,
{
let head = self.lookup_head()?;
let builder_ctx = BuilderContext::new(
head,
self.blockchain_db().clone(),
self.task_executor().clone(),
self.configs().clone(),
);
debug!(target: "reth::cli", "creating components");
let components = components_builder.build_components(&builder_ctx).await?;
let blockchain_db = self.blockchain_db().clone();
let node_adapter = NodeAdapter {
components,
task_executor: self.task_executor().clone(),
provider: blockchain_db,
};
debug!(target: "reth::cli", "calling on_component_initialized hook");
on_component_initialized.on_event(node_adapter.clone())?;
let components_container = WithComponents {
db_provider_container: WithMeteredProvider {
provider_factory: self.provider_factory().clone(),
metrics_sender: self.sync_metrics_tx(),
},
node_adapter,
head,
};
let ctx = LaunchContextWith {
inner: self.inner,
attachment: self.attachment.map_right(|_| components_container),
};
Ok(ctx)
}
}
impl<T, CB>
LaunchContextWith<
Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
>
where
T: FullNodeTypes<Types: NodeTypesForProvider>,
CB: NodeComponentsBuilder<T>,
{
pub const fn provider_factory(
&self,
) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
&self.right().db_provider_container.provider_factory
}
pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
where
C: HeadersClient<Header: BlockHeader>,
{
self.node_config().max_block(client, self.provider_factory().clone()).await
}
pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
self.provider_factory().static_file_provider()
}
pub fn static_file_producer(
&self,
) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
}
pub const fn head(&self) -> Head {
self.right().head
}
pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
&self.right().node_adapter
}
pub fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
&mut self.right_mut().node_adapter
}
pub const fn blockchain_db(&self) -> &T::Provider {
&self.node_adapter().provider
}
pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
let mut initial_target = self.node_config().debug.tip;
if initial_target.is_none() {
initial_target = self.check_pipeline_consistency()?;
}
Ok(initial_target)
}
pub const fn terminate_after_initial_backfill(&self) -> bool {
self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
}
fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
if self.chain_spec().is_optimism() &&
!self.is_dev() &&
self.chain_id() == Chain::optimism_mainnet()
{
let latest = self.blockchain_db().last_block_number()?;
if latest < 105235063 {
error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended");
return Err(ProviderError::BestBlockNotFound)
}
}
Ok(())
}
pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
let first_stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;
for stage_id in StageId::ALL.iter().skip(1) {
let stage_checkpoint = self
.blockchain_db()
.get_stage_checkpoint(*stage_id)?
.unwrap_or_default()
.block_number;
if stage_checkpoint < first_stage_checkpoint {
debug!(
target: "consensus::engine",
first_stage_checkpoint,
inconsistent_stage_id = %stage_id,
inconsistent_stage_checkpoint = stage_checkpoint,
"Pipeline sync progress is inconsistent"
);
return self.blockchain_db().block_hash(first_stage_checkpoint);
}
}
self.ensure_chain_specific_db_checks()?;
Ok(None)
}
pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
self.right().db_provider_container.metrics_sender.clone()
}
pub const fn components(&self) -> &CB::Components {
&self.node_adapter().components
}
}
impl<T, CB>
LaunchContextWith<
Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
>
where
T: FullNodeTypes<
Provider: StateProviderFactory + ChainSpecProvider,
Types: NodeTypesForProvider<Primitives: NodePrimitives<SignedTx = TransactionSigned>>,
>,
CB: NodeComponentsBuilder<T>,
{
pub fn invalid_block_hook(
&self,
) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
return Ok(Box::new(NoopInvalidBlockHook::default()))
};
let healthy_node_rpc_client = self.get_healthy_node_client()?;
let output_directory = self.data_dir().invalid_block_hooks();
let hooks = hook
.iter()
.copied()
.map(|hook| {
let output_directory = output_directory.join(hook.to_string());
fs::create_dir_all(&output_directory)?;
Ok(match hook {
InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
self.blockchain_db().clone(),
self.components().evm_config().clone(),
output_directory,
healthy_node_rpc_client.clone(),
)),
InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
eyre::bail!("invalid block hook {hook:?} is not implemented yet")
}
} as Box<dyn InvalidBlockHook<_>>)
})
.collect::<Result<_, _>>()?;
Ok(Box::new(InvalidBlockHooks(hooks)))
}
fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
self.node_config()
.debug
.healthy_node_rpc_url
.as_ref()
.map(|url| {
let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
let chain_id = futures::executor::block_on(async {
EthApiClient::<
alloy_rpc_types::Transaction,
alloy_rpc_types::Block,
alloy_rpc_types::Receipt,
alloy_rpc_types::Header,
>::chain_id(&client)
.await
})?
.ok_or_eyre("healthy node rpc client didn't return a chain id")?;
if chain_id.to::<u64>() != self.chain_id().id() {
eyre::bail!("invalid chain id for healthy node: {chain_id}")
}
Ok(client)
})
.transpose()
}
}
#[derive(Clone, Copy, Debug)]
pub struct Attached<L, R> {
left: L,
right: R,
}
impl<L, R> Attached<L, R> {
pub const fn new(left: L, right: R) -> Self {
Self { left, right }
}
pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
where
F: FnOnce(L) -> T,
{
Attached::new(f(self.left), self.right)
}
pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
where
F: FnOnce(R) -> T,
{
Attached::new(self.left, f(self.right))
}
pub const fn left(&self) -> &L {
&self.left
}
pub const fn right(&self) -> &R {
&self.right
}
pub fn left_mut(&mut self) -> &mut R {
&mut self.right
}
pub fn right_mut(&mut self) -> &mut R {
&mut self.right
}
}
#[derive(Debug, Clone)]
pub struct WithConfigs<ChainSpec> {
pub config: NodeConfig<ChainSpec>,
pub toml_config: reth_config::Config,
}
#[derive(Debug, Clone)]
pub struct WithMeteredProvider<N: NodeTypesWithDB> {
provider_factory: ProviderFactory<N>,
metrics_sender: UnboundedSender<MetricEvent>,
}
#[allow(missing_debug_implementations)]
pub struct WithMeteredProviders<T>
where
T: FullNodeTypes,
{
db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
blockchain_db: T::Provider,
}
#[allow(missing_debug_implementations)]
pub struct WithComponents<T, CB>
where
T: FullNodeTypes,
CB: NodeComponentsBuilder<T>,
{
db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
node_adapter: NodeAdapter<T, CB::Components>,
head: Head,
}
#[cfg(test)]
mod tests {
use super::{LaunchContext, NodeConfig};
use reth_config::Config;
use reth_node_core::args::PruningArgs;
const EXTENSION: &str = "toml";
fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
let temp_dir = tempfile::tempdir().unwrap();
let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
proc(&config_path);
temp_dir.close().unwrap()
}
#[test]
fn test_save_prune_config() {
with_tempdir("prune-store-test", |config_path| {
let mut reth_config = Config::default();
let node_config = NodeConfig {
pruning: PruningArgs {
full: true,
block_interval: None,
sender_recovery_full: false,
sender_recovery_distance: None,
sender_recovery_before: None,
transaction_lookup_full: false,
transaction_lookup_distance: None,
transaction_lookup_before: None,
receipts_full: false,
receipts_distance: None,
receipts_before: None,
account_history_full: false,
account_history_distance: None,
account_history_before: None,
storage_history_full: false,
storage_history_distance: None,
storage_history_before: None,
receipts_log_filter: vec![],
},
..NodeConfig::test()
};
LaunchContext::save_pruning_config_if_full_node(
&mut reth_config,
&node_config,
config_path,
)
.unwrap();
let loaded_config = Config::from_path(config_path).unwrap();
assert_eq!(reth_config, loaded_config);
})
}
}