1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{future::Either, stream, stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_db_api::{database_metrics::DatabaseMetrics, Database};
15use reth_engine_local::{LocalMiner, LocalPayloadAttributesBuilder};
16use reth_engine_service::service::{ChainEvent, EngineService};
17use reth_engine_tree::{
18 engine::{EngineApiRequest, EngineRequestHandler},
19 tree::TreeConfig,
20};
21use reth_engine_util::EngineMessageStreamExt;
22use reth_exex::ExExManagerHandle;
23use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
24use reth_network_api::BlockDownloaderProvider;
25use reth_node_api::{
26 BeaconConsensusEngineHandle, BuiltPayload, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
27 PayloadAttributesBuilder, PayloadTypes,
28};
29use reth_node_core::{
30 args::DefaultEraHost,
31 dirs::{ChainPath, DataDirPath},
32 exit::NodeExitFuture,
33 primitives::Head,
34};
35use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
36use reth_provider::{
37 providers::{BlockchainProvider, NodeTypesForProvider},
38 BlockNumReader,
39};
40use reth_stages::stages::EraImportSource;
41use reth_tasks::TaskExecutor;
42use reth_tokio_util::EventSender;
43use reth_tracing::tracing::{debug, error, info};
44use std::sync::Arc;
45use tokio::sync::{mpsc::unbounded_channel, oneshot};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47
48#[derive(Debug)]
50pub struct EngineNodeLauncher {
51 pub ctx: LaunchContext,
53
54 pub engine_tree_config: TreeConfig,
57}
58
59impl EngineNodeLauncher {
60 pub const fn new(
62 task_executor: TaskExecutor,
63 data_dir: ChainPath<DataDirPath>,
64 engine_tree_config: TreeConfig,
65 ) -> Self {
66 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
67 }
68}
69
70impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
71where
72 Types: NodeTypesForProvider + NodeTypes,
73 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
74 T: FullNodeTypes<
75 Types = Types,
76 DB = DB,
77 Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
78 >,
79 CB: NodeComponentsBuilder<T>,
80 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
81 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
82 LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
83 <<Types as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
84 >,
85{
86 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
87
88 async fn launch_node(
89 self,
90 target: NodeBuilderWithComponents<T, CB, AO>,
91 ) -> eyre::Result<Self::Node> {
92 let Self { ctx, engine_tree_config } = self;
93 let NodeBuilderWithComponents {
94 adapter: NodeTypesAdapter { database },
95 components_builder,
96 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
97 config,
98 } = target;
99 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
100
101 let ctx = ctx
103 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
104 .with_loaded_toml_config(config)?
106 .with_resolved_peers()?
108 .attach(database.clone())
110 .with_adjusted_configs()
112 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
114 .inspect(|_| {
115 info!(target: "reth::cli", "Database opened");
116 })
117 .with_prometheus_server().await?
118 .inspect(|this| {
119 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
120 })
121 .with_genesis()?
122 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
123 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
124 })
125 .with_metrics_task()
126 .with_blockchain_db::<T, _>(move |provider_factory| {
129 Ok(BlockchainProvider::new(provider_factory)?)
130 })?
131 .with_components(components_builder, on_component_initialized).await?;
132
133 let exex_manager_handle = ExExLauncher::new(
135 ctx.head(),
136 ctx.node_adapter().clone(),
137 installed_exex,
138 ctx.configs().clone(),
139 )
140 .launch()
141 .await?;
142
143 let network_handle = ctx.components().network().clone();
145 let network_client = network_handle.fetch_client().await?;
146 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
147
148 let node_config = ctx.node_config();
149
150 network_handle.update_sync_state(SyncState::Syncing);
152
153 let max_block = ctx.max_block(network_client.clone()).await?;
154
155 let static_file_producer = ctx.static_file_producer();
156 let static_file_producer_events = static_file_producer.lock().events();
157 info!(target: "reth::cli", "StaticFileProducer initialized");
158
159 let consensus = Arc::new(ctx.components().consensus().clone());
160
161 let pipeline_exex_handle =
163 exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
164
165 let era_import_source = if node_config.era.enabled {
166 EraImportSource::maybe_new(
167 node_config.era.source.path.clone(),
168 node_config.era.source.url.clone(),
169 || node_config.chain.chain().kind().default_era_host(),
170 || node_config.datadir().data_dir().join("era").into(),
171 )
172 } else {
173 None
174 };
175
176 let pipeline = build_networked_pipeline(
177 &ctx.toml_config().stages,
178 network_client.clone(),
179 consensus.clone(),
180 ctx.provider_factory().clone(),
181 ctx.task_executor(),
182 ctx.sync_metrics_tx(),
183 ctx.prune_config(),
184 max_block,
185 static_file_producer,
186 ctx.components().evm_config().clone(),
187 pipeline_exex_handle,
188 era_import_source,
189 )?;
190
191 pipeline.move_to_static_files()?;
193
194 let pipeline_events = pipeline.events();
195
196 let mut pruner_builder = ctx.pruner_builder();
197 if let Some(exex_manager_handle) = &exex_manager_handle {
198 pruner_builder =
199 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
200 }
201 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
202 let pruner_events = pruner.events();
203 info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
204
205 let event_sender = EventSender::default();
206
207 let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
208
209 let jwt_secret = ctx.auth_jwt_secret()?;
211
212 let add_ons_ctx = AddOnsContext {
213 node: ctx.node_adapter().clone(),
214 config: ctx.node_config(),
215 beacon_engine_handle: beacon_engine_handle.clone(),
216 jwt_secret,
217 engine_events: event_sender.clone(),
218 };
219 let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
220
221 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
222 .maybe_skip_fcu(node_config.debug.skip_fcu)
223 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
224 .maybe_reorg(
225 ctx.blockchain_db().clone(),
226 ctx.components().evm_config().clone(),
227 engine_payload_validator.clone(),
228 node_config.debug.reorg_frequency,
229 node_config.debug.reorg_depth,
230 )
231 .maybe_store_messages(node_config.debug.engine_api_store.clone());
235
236 let mut engine_service = EngineService::new(
237 consensus.clone(),
238 ctx.chain_spec(),
239 network_client.clone(),
240 Box::pin(consensus_engine_stream),
241 pipeline,
242 Box::new(ctx.task_executor().clone()),
243 ctx.provider_factory().clone(),
244 ctx.blockchain_db().clone(),
245 pruner,
246 ctx.components().payload_builder_handle().clone(),
247 engine_payload_validator,
248 engine_tree_config,
249 ctx.invalid_block_hook()?,
250 ctx.sync_metrics_tx(),
251 ctx.components().evm_config().clone(),
252 );
253
254 if ctx.is_dev() {
255 ctx.task_executor().spawn_critical(
256 "local engine",
257 LocalMiner::new(
258 ctx.blockchain_db().clone(),
259 LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
260 beacon_engine_handle.clone(),
261 ctx.dev_mining_mode(ctx.components().pool()),
262 ctx.components().payload_builder_handle().clone(),
263 )
264 .run(),
265 );
266 }
267
268 info!(target: "reth::cli", "Consensus engine initialized");
269
270 let events = stream_select!(
271 event_sender.new_listener().map(Into::into),
272 pipeline_events.map(Into::into),
273 if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
274 Either::Left(
275 ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
276 .map(Into::into),
277 )
278 } else {
279 Either::Right(stream::empty())
280 },
281 pruner_events.map(Into::into),
282 static_file_producer_events.map(Into::into),
283 );
284
285 ctx.task_executor().spawn_critical(
286 "events task",
287 node::handle_events(
288 Some(Box::new(ctx.components().network().clone())),
289 Some(ctx.head().number),
290 events,
291 ),
292 );
293
294 let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
295 add_ons.launch_add_ons(add_ons_ctx).await?;
296
297 let initial_target = ctx.initial_backfill_target()?;
299 let mut built_payloads = ctx
300 .components()
301 .payload_builder_handle()
302 .subscribe()
303 .await
304 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
305 .into_built_payload_stream()
306 .fuse();
307
308 let chainspec = ctx.chain_spec();
309 let provider = ctx.blockchain_db().clone();
310 let (exit, rx) = oneshot::channel();
311 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
312
313 info!(target: "reth::cli", "Starting consensus engine");
314 ctx.task_executor().spawn_critical("consensus engine", async move {
315 if let Some(initial_target) = initial_target {
316 debug!(target: "reth::cli", %initial_target, "start backfill sync");
317 engine_service.orchestrator_mut().start_backfill_sync(initial_target);
318 }
319
320 let mut res = Ok(());
321
322 loop {
324 tokio::select! {
325 payload = built_payloads.select_next_some() => {
326 if let Some(executed_block) = payload.executed_block() {
327 debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(), "inserting built payload");
328 engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
329 }
330 }
331 event = engine_service.next() => {
332 let Some(event) = event else { break };
333 debug!(target: "reth::cli", "Event: {event}");
334 match event {
335 ChainEvent::BackfillSyncFinished => {
336 if terminate_after_backfill {
337 debug!(target: "reth::cli", "Terminating after initial backfill");
338 break
339 }
340 }
341 ChainEvent::BackfillSyncStarted => {
342 network_handle.update_sync_state(SyncState::Syncing);
343 }
344 ChainEvent::FatalError => {
345 error!(target: "reth::cli", "Fatal error in consensus engine");
346 res = Err(eyre::eyre!("Fatal error in consensus engine"));
347 break
348 }
349 ChainEvent::Handler(ev) => {
350 if let Some(head) = ev.canonical_header() {
351 network_handle.update_sync_state(SyncState::Idle);
353 let head_block = Head {
354 number: head.number(),
355 hash: head.hash(),
356 difficulty: head.difficulty(),
357 timestamp: head.timestamp(),
358 total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
359 };
360 network_handle.update_status(head_block);
361
362 let updated = BlockRangeUpdate {
363 earliest: provider.earliest_block_number().unwrap_or_default(),
364 latest:head.number(),
365 latest_hash:head.hash()
366 };
367 network_handle.update_block_range(updated);
368 }
369 event_sender.notify(ev);
370 }
371 }
372 }
373 }
374 }
375
376 let _ = exit.send(res);
377 });
378
379 let full_node = FullNode {
380 evm_config: ctx.components().evm_config().clone(),
381 pool: ctx.components().pool().clone(),
382 network: ctx.components().network().clone(),
383 provider: ctx.node_adapter().provider.clone(),
384 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
385 task_executor: ctx.task_executor().clone(),
386 config: ctx.node_config().clone(),
387 data_dir: ctx.data_dir().clone(),
388 add_ons_handle: RpcHandle {
389 rpc_server_handles,
390 rpc_registry,
391 engine_events,
392 beacon_engine_handle,
393 },
394 };
395 on_node_started.on_event(FullNode::clone(&full_node))?;
397
398 let handle = NodeHandle {
399 node_exit_future: NodeExitFuture::new(
400 async { rx.await? },
401 full_node.config.debug.terminate,
402 ),
403 node: full_node,
404 };
405
406 Ok(handle)
407 }
408}