use futures::{future::Either, stream, stream_select, StreamExt};
use reth_beacon_consensus::{
hooks::{EngineHooks, StaticFileHook},
BeaconConsensusEngineHandle,
};
use reth_blockchain_tree::BlockchainTreeConfig;
use reth_chainspec::EthChainSpec;
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider};
use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
use reth_engine_service::service::{ChainEvent, EngineService};
use reth_engine_tree::{
engine::{EngineApiRequest, EngineRequestHandler},
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
use reth_network::{NetworkSyncUpdater, SyncState};
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_node_api::{
BuiltPayload, FullNodeTypes, NodeTypesWithEngine, PayloadAttributesBuilder, PayloadTypes,
};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture,
primitives::Head,
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_payload_primitives::PayloadBuilder;
use reth_primitives::EthereumHardforks;
use reth_provider::providers::{BlockchainProvider2, ProviderNodeTypes};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
rpc::{RethRpcAddOns, RpcHandle},
setup::build_networked_pipeline,
AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
};
#[derive(Debug)]
pub struct EngineNodeLauncher {
pub ctx: LaunchContext,
pub engine_tree_config: TreeConfig,
}
impl EngineNodeLauncher {
pub const fn new(
task_executor: TaskExecutor,
data_dir: ChainPath<DataDirPath>,
engine_tree_config: TreeConfig,
) -> Self {
Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
}
}
impl<Types, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
where
Types: ProviderNodeTypes + NodeTypesWithEngine,
T: FullNodeTypes<Types = Types, Provider = BlockchainProvider2<Types>>,
CB: NodeComponentsBuilder<T>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
<<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
>,
{
type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
async fn launch_node(
self,
target: NodeBuilderWithComponents<T, CB, AO>,
) -> eyre::Result<Self::Node> {
let Self { ctx, engine_tree_config } = self;
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
components_builder,
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
config,
} = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
let tree_config = BlockchainTreeConfig::default();
let (canon_state_notification_sender, _receiver) =
tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
let ctx = ctx
.with_configured_globals()
.with_loaded_toml_config(config)?
.with_resolved_peers().await?
.attach(database.clone())
.with_adjusted_configs()
.with_provider_factory().await?
.inspect(|_| {
info!(target: "reth::cli", "Database opened");
})
.with_prometheus_server().await?
.inspect(|this| {
debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
})
.with_genesis()?
.inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
})
.with_metrics_task()
.with_blockchain_db::<T, _>(move |provider_factory| {
Ok(BlockchainProvider2::new(provider_factory)?)
}, tree_config, canon_state_notification_sender)?
.with_components(components_builder, on_component_initialized).await?;
let exex_manager_handle = ExExLauncher::new(
ctx.head(),
ctx.node_adapter().clone(),
installed_exex,
ctx.configs().clone(),
)
.launch()
.await?;
let network_client = ctx.components().network().fetch_client().await?;
let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
let node_config = ctx.node_config();
let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
.maybe_skip_fcu(node_config.debug.skip_fcu)
.maybe_skip_new_payload(node_config.debug.skip_new_payload)
.maybe_reorg(
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
node_config.debug.reorg_frequency,
node_config.debug.reorg_depth,
)
.maybe_store_messages(node_config.debug.engine_api_store.clone());
let max_block = ctx.max_block(network_client.clone()).await?;
let mut hooks = EngineHooks::new();
let static_file_producer = ctx.static_file_producer();
let static_file_producer_events = static_file_producer.lock().events();
hooks.add(StaticFileHook::new(
static_file_producer.clone(),
Box::new(ctx.task_executor().clone()),
));
info!(target: "reth::cli", "StaticFileProducer initialized");
let pipeline_exex_handle =
exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
let pipeline = build_networked_pipeline(
&ctx.toml_config().stages,
network_client.clone(),
ctx.consensus(),
ctx.provider_factory().clone(),
ctx.task_executor(),
ctx.sync_metrics_tx(),
ctx.prune_config(),
max_block,
static_file_producer,
ctx.components().block_executor().clone(),
pipeline_exex_handle,
)?;
pipeline.move_to_static_files()?;
let pipeline_events = pipeline.events();
let mut pruner_builder = ctx.pruner_builder();
if let Some(exex_manager_handle) = &exex_manager_handle {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
}
let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
let mut engine_service = if ctx.is_dev() {
let eth_service = LocalEngineService::new(
ctx.consensus(),
ctx.components().block_executor().clone(),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
ctx.components().payload_builder().clone(),
engine_tree_config,
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
consensus_engine_tx.clone(),
Box::pin(consensus_engine_stream),
ctx.dev_mining_mode(ctx.components().pool()),
LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
);
Either::Left(eth_service)
} else {
let eth_service = EngineService::new(
ctx.consensus(),
ctx.components().block_executor().clone(),
ctx.chain_spec(),
network_client.clone(),
Box::pin(consensus_engine_stream),
pipeline,
Box::new(ctx.task_executor().clone()),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
ctx.components().payload_builder().clone(),
engine_tree_config,
ctx.invalid_block_hook()?,
ctx.sync_metrics_tx(),
);
Either::Right(eth_service)
};
let event_sender = EventSender::default();
let beacon_engine_handle =
BeaconConsensusEngineHandle::new(consensus_engine_tx, event_sender.clone());
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select!(
ctx.components().network().event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left(
ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
.map(Into::into),
)
} else {
Either::Right(stream::empty())
},
pruner_events.map(Into::into),
static_file_producer_events.map(Into::into),
);
ctx.task_executor().spawn_critical(
"events task",
node::handle_events(
Some(Box::new(ctx.components().network().clone())),
Some(ctx.head().number),
events,
),
);
let jwt_secret = ctx.auth_jwt_secret()?;
let add_ons_ctx = AddOnsContext {
node: ctx.node_adapter().clone(),
config: ctx.node_config(),
beacon_engine_handle,
jwt_secret,
};
let RpcHandle { rpc_server_handles, rpc_registry } =
add_ons.launch_add_ons(add_ons_ctx).await?;
if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() {
info!(target: "reth::cli", "Using etherscan as consensus client");
let chain = ctx.node_config().chain.chain();
let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| {
chain
.etherscan_urls()
.map(|urls| urls.0.to_string())
.ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}"))
})?;
let block_provider = EtherscanBlockProvider::new(
etherscan_url,
chain.etherscan_api_key().ok_or_else(|| {
eyre::eyre!(
"etherscan api key not found for rpc consensus client for chain: {chain}"
)
})?,
);
let rpc_consensus_client = DebugConsensusClient::new(
rpc_server_handles.auth.clone(),
Arc::new(block_provider),
);
ctx.task_executor().spawn_critical("etherscan consensus client", async move {
rpc_consensus_client.run::<<Types as NodeTypesWithEngine>::Engine>().await
});
}
let initial_target = ctx.initial_backfill_target()?;
let network_handle = ctx.components().network().clone();
let mut built_payloads = ctx
.components()
.payload_builder()
.subscribe()
.await
.map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
.into_built_payload_stream()
.fuse();
let chainspec = ctx.chain_spec();
let (exit, rx) = oneshot::channel();
let terminate_after_backfill = ctx.terminate_after_initial_backfill();
info!(target: "reth::cli", "Starting consensus engine");
ctx.task_executor().spawn_critical("consensus engine", async move {
if let Some(initial_target) = initial_target {
debug!(target: "reth::cli", %initial_target, "start backfill sync");
if let Either::Right(eth_service) = &mut engine_service {
eth_service.orchestrator_mut().start_backfill_sync(initial_target);
}
}
let mut res = Ok(());
loop {
tokio::select! {
payload = built_payloads.select_next_some() => {
if let Some(executed_block) = payload.executed_block() {
debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload");
if let Either::Right(eth_service) = &mut engine_service {
eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
}
}
}
event = engine_service.next() => {
let Some(event) = event else { break };
debug!(target: "reth::cli", "Event: {event}");
match event {
ChainEvent::BackfillSyncFinished => {
if terminate_after_backfill {
debug!(target: "reth::cli", "Terminating after initial backfill");
break
}
network_handle.update_sync_state(SyncState::Idle);
}
ChainEvent::BackfillSyncStarted => {
network_handle.update_sync_state(SyncState::Syncing);
}
ChainEvent::FatalError => {
error!(target: "reth::cli", "Fatal error in consensus engine");
res = Err(eyre::eyre!("Fatal error in consensus engine"));
break
}
ChainEvent::Handler(ev) => {
if let Some(head) = ev.canonical_header() {
let head_block = Head {
number: head.number,
hash: head.hash(),
difficulty: head.difficulty,
timestamp: head.timestamp,
total_difficulty: chainspec
.final_paris_total_difficulty(head.number)
.unwrap_or_default(),
};
network_handle.update_status(head_block);
}
event_sender.notify(ev);
}
}
}
}
}
let _ = exit.send(res);
});
let full_node = FullNode {
evm_config: ctx.components().evm_config().clone(),
block_executor: ctx.components().block_executor().clone(),
pool: ctx.components().pool().clone(),
network: ctx.components().network().clone(),
provider: ctx.node_adapter().provider.clone(),
payload_builder: ctx.components().payload_builder().clone(),
task_executor: ctx.task_executor().clone(),
config: ctx.node_config().clone(),
data_dir: ctx.data_dir().clone(),
add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
};
on_node_started.on_event(FullNode::clone(&full_node))?;
let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(
async { rx.await? },
full_node.config.debug.terminate,
),
node: full_node,
};
Ok(handle)
}
}