1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, Node, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10 RethFullAdapter,
11};
12use alloy_consensus::BlockHeader;
13use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
14use reth_chainspec::{EthChainSpec, EthereumHardforks};
15use reth_db::{database_metrics::DatabaseMetrics, Database};
16use reth_engine_tree::{
17 chain::{ChainEvent, FromOrchestrator},
18 engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
19 launch::build_engine_orchestrator,
20 tree::TreeConfig,
21};
22use reth_engine_util::EngineMessageStreamExt;
23use reth_exex::ExExManagerHandle;
24use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
25use reth_network_api::BlockDownloaderProvider;
26use reth_node_api::{
27 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
28};
29use reth_node_core::{
30 args::PruneConfigKind,
31 dirs::{ChainPath, DataDirPath},
32 exit::NodeExitFuture,
33 primitives::Head,
34};
35use reth_node_events::node;
36use reth_provider::{
37 providers::{BlockchainProvider, NodeTypesForProvider},
38 BlockNumReader, StorageSettingsCache,
39};
40use reth_tasks::TaskExecutor;
41use reth_tokio_util::EventSender;
42use reth_tracing::tracing::{debug, error, info};
43use reth_trie_db::ChangesetCache;
44use std::{future::Future, pin::Pin, sync::Arc};
45use tokio::sync::{mpsc::unbounded_channel, oneshot};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47
48#[derive(Debug)]
50pub struct EngineNodeLauncher {
51 pub ctx: LaunchContext,
53
54 pub engine_tree_config: TreeConfig,
57}
58
59impl EngineNodeLauncher {
60 pub const fn new(
62 task_executor: TaskExecutor,
63 data_dir: ChainPath<DataDirPath>,
64 engine_tree_config: TreeConfig,
65 ) -> Self {
66 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
67 }
68
69 async fn launch_node<N, DB, T, CB, AO>(
70 self,
71 target: NodeBuilderWithComponents<T, CB, AO>,
72 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
73 where
74 N: Node<RethFullAdapter<DB, N>> + NodeTypesForProvider,
75 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
76 T: FullNodeTypes<
77 Types = N,
78 Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, DB>>,
79 DB = DB,
80 >,
81 CB: NodeComponentsBuilder<T>,
82 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
83 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
84 {
85 let Self { ctx, engine_tree_config } = self;
86 let NodeBuilderWithComponents {
87 adapter: NodeTypesAdapter { database },
88 rocksdb_provider,
89 components_builder,
90 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
91 config,
92 } = target;
93 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
94
95 let changeset_cache = ChangesetCache::new();
97 let disabled_stages = N::disabled_stages();
98
99 let ctx = ctx
101 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
102 .with_loaded_toml_config(config)?
104 .with_resolved_peers()?
106 .attach(database.clone())
108 .with_adjusted_configs()
110 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(
112 changeset_cache.clone(),
113 rocksdb_provider,
114 disabled_stages,
115 )
116 .await?
117 .inspect(|_| {
118 info!(target: "reth::cli", "Database opened");
119 })
120 .with_prometheus_server().await?
121 .inspect(|this| {
122 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
123 })
124 .with_genesis()?
125 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
126 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
127 let settings = this.provider_factory().cached_storage_settings();
128 let pruning_mode =
129 PruneConfigKind::from_config(&this.prune_config(), this.chain_spec().as_ref()).as_str();
130 info!(target: "reth::cli", ?settings, ?pruning_mode, "Loaded storage settings");
131 })
132 .with_metrics_task()
133 .with_blockchain_db::<T, _>(move |provider_factory| {
136 Ok(BlockchainProvider::new(provider_factory)?)
137 })?
138 .with_components(components_builder, on_component_initialized).await?;
139
140 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
142
143 let network_handle = ctx.components().network().clone();
145 let network_client = network_handle.fetch_client().await?;
146 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
147
148 let node_config = ctx.node_config();
149
150 network_handle.update_sync_state(SyncState::Syncing);
152
153 let max_block = ctx.max_block(network_client.clone()).await?;
154
155 let static_file_producer = ctx.static_file_producer();
156 let static_file_producer_events = static_file_producer.lock().events();
157 info!(target: "reth::cli", "StaticFileProducer initialized");
158
159 let consensus = Arc::new(ctx.components().consensus().clone());
160
161 let pipeline = build_networked_pipeline(
162 &ctx.toml_config().stages,
163 network_client.clone(),
164 consensus.clone(),
165 ctx.provider_factory().clone(),
166 ctx.task_executor(),
167 ctx.sync_metrics_tx(),
168 ctx.prune_config(),
169 max_block,
170 static_file_producer,
171 ctx.components().evm_config().clone(),
172 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
173 ctx.era_import_source(),
174 disabled_stages,
175 )?;
176
177 pipeline.move_to_static_files()?;
179
180 let pipeline_events = pipeline.events();
181
182 let mut pruner_builder = ctx.pruner_builder();
183 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
184 pruner_builder =
185 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
186 }
187 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
188 let pruner_events = pruner.events();
189 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
190
191 let event_sender = EventSender::default();
192
193 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
194
195 let jwt_secret = ctx.auth_jwt_secret()?;
197
198 let add_ons_ctx = AddOnsContext {
199 node: ctx.node_adapter().clone(),
200 config: ctx.node_config(),
201 beacon_engine_handle: beacon_engine_handle.clone(),
202 jwt_secret,
203 engine_events: event_sender.clone(),
204 };
205 let validator_builder = add_ons.engine_validator_builder();
206
207 let engine_validator = validator_builder
209 .clone()
210 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
211 .await?;
212
213 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
215 .maybe_skip_fcu(node_config.debug.skip_fcu)
216 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
217 .maybe_reorg(
218 ctx.blockchain_db().clone(),
219 ctx.components().evm_config().clone(),
220 || async {
221 validator_builder
222 .build_tree_validator(
223 &add_ons_ctx,
224 engine_tree_config.clone(),
225 changeset_cache.clone(),
226 )
227 .await
228 },
229 node_config.debug.reorg_frequency,
230 node_config.debug.reorg_depth,
231 )
232 .await?
233 .maybe_store_messages(node_config.debug.engine_api_store.clone());
237
238 let engine_kind = if ctx.chain_spec().is_optimism() {
239 EngineApiKind::OpStack
240 } else {
241 EngineApiKind::Ethereum
242 };
243
244 let mut orchestrator = build_engine_orchestrator(
245 engine_kind,
246 consensus.clone(),
247 network_client.clone(),
248 Box::pin(consensus_engine_stream),
249 pipeline,
250 ctx.task_executor().clone(),
251 ctx.provider_factory().clone(),
252 ctx.blockchain_db().clone(),
253 pruner,
254 ctx.components().payload_builder_handle().clone(),
255 engine_validator,
256 engine_tree_config,
257 ctx.sync_metrics_tx(),
258 ctx.components().evm_config().clone(),
259 changeset_cache,
260 ctx.task_executor().clone(),
261 );
262
263 info!(target: "reth::cli", "Consensus engine initialized");
264
265 #[expect(clippy::needless_continue)]
266 let events = stream_select!(
267 event_sender.new_listener().map(Into::into),
268 pipeline_events.map(Into::into),
269 ctx.consensus_layer_events(),
270 pruner_events.map(Into::into),
271 static_file_producer_events.map(Into::into),
272 );
273
274 ctx.task_executor().spawn_critical_task(
275 "events task",
276 node::handle_events(
277 Some(Box::new(ctx.components().network().clone())),
278 Some(ctx.head().number),
279 events,
280 ),
281 );
282
283 let RpcHandle {
284 rpc_server_handles,
285 rpc_registry,
286 engine_events,
287 beacon_engine_handle,
288 engine_shutdown: _,
289 } = add_ons.launch_add_ons(add_ons_ctx).await?;
290
291 let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
293
294 let initial_target = ctx.initial_backfill_target(disabled_stages)?;
296 let mut built_payloads = ctx
297 .components()
298 .payload_builder_handle()
299 .subscribe()
300 .await
301 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
302 .into_built_payload_stream()
303 .fuse();
304
305 let chainspec = ctx.chain_spec();
306 let provider = ctx.blockchain_db().clone();
307 let (exit, rx) = oneshot::channel();
308 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
309 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
310
311 info!(target: "reth::cli", "Starting consensus engine");
312 let consensus_engine = move |mut on_graceful_shutdown| async move {
313 if let Some(initial_target) = initial_target {
314 debug!(target: "reth::cli", %initial_target, "start backfill sync");
315 orchestrator.start_backfill_sync(initial_target);
317 } else if startup_sync_state_idle {
318 network_handle.update_sync_state(SyncState::Idle);
319 }
320
321 let mut res = Ok(());
322 let mut shutdown_rx = shutdown_rx.fuse();
323
324 loop {
328 tokio::select! {
329 event = orchestrator.next() => {
330 let Some(event) = event else { break };
331 debug!(target: "reth::cli", "Event: {event}");
332 match event {
333 ChainEvent::BackfillSyncFinished => {
334 if terminate_after_backfill {
335 debug!(target: "reth::cli", "Terminating after initial backfill");
336 break
337 }
338 if startup_sync_state_idle {
339 network_handle.update_sync_state(SyncState::Idle);
340 }
341 }
342 ChainEvent::BackfillSyncStarted => {
343 network_handle.update_sync_state(SyncState::Syncing);
344 }
345 ChainEvent::FatalError => {
346 error!(target: "reth::cli", "Fatal error in consensus engine");
347 res = Err(eyre::eyre!("Fatal error in consensus engine"));
348 break
349 }
350 ChainEvent::Handler(ev) => {
351 if let Some(head) = ev.canonical_header() {
352 network_handle.update_sync_state(SyncState::Idle);
354 let head_block = Head {
355 number: head.number(),
356 hash: head.hash(),
357 difficulty: head.difficulty(),
358 timestamp: head.timestamp(),
359 total_difficulty: chainspec.final_paris_total_difficulty()
360 .filter(|_| chainspec.is_paris_active_at_block(head.number()))
361 .unwrap_or_default(),
362 };
363 network_handle.update_status(head_block);
364
365 let updated = BlockRangeUpdate {
366 earliest: provider.earliest_block_number().unwrap_or_default(),
367 latest: head.number(),
368 latest_hash: head.hash(),
369 };
370 network_handle.update_block_range(updated);
371 }
372 event_sender.notify(ev);
373 }
374 }
375 }
376 payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
377 if let Some(executed_block) = payload.executed_block() {
378 debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
379 orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
380 }
381 }
382 shutdown_req = &mut shutdown_rx => {
383 if let Ok(req) = shutdown_req {
384 debug!(target: "reth::cli", "received engine shutdown request");
385 orchestrator.handler_mut().handler_mut().on_event(
386 FromOrchestrator::Terminate { tx: req.done_tx }.into()
387 );
388 }
389 }
390 _guard = &mut on_graceful_shutdown => {
391 debug!(target: "reth::cli", "shutdown signal received, terminating engine");
395 let (done_tx, done_rx) = oneshot::channel();
396 orchestrator.handler_mut().handler_mut().on_event(
397 FromOrchestrator::Terminate { tx: done_tx }.into()
398 );
399 let _ = done_rx.await;
400 break;
401 }
402 }
403 }
404
405 let _ = exit.send(res);
406 };
407 ctx.task_executor()
408 .spawn_critical_with_graceful_shutdown_signal("consensus engine", consensus_engine);
409
410 let engine_events_for_ethstats = engine_events.new_listener();
411
412 let full_node = FullNode {
413 evm_config: ctx.components().evm_config().clone(),
414 pool: ctx.components().pool().clone(),
415 network: ctx.components().network().clone(),
416 provider: ctx.node_adapter().provider.clone(),
417 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
418 task_executor: ctx.task_executor().clone(),
419 config: ctx.node_config().clone(),
420 data_dir: ctx.data_dir().clone(),
421 add_ons_handle: RpcHandle {
422 rpc_server_handles,
423 rpc_registry,
424 engine_events,
425 beacon_engine_handle,
426 engine_shutdown,
427 },
428 };
429 on_node_started.on_event(FullNode::clone(&full_node))?;
431
432 ctx.spawn_ethstats(engine_events_for_ethstats).await?;
433
434 let handle = NodeHandle {
435 node_exit_future: NodeExitFuture::new(async { rx.await? }),
436 node: full_node,
437 };
438
439 Ok(handle)
440 }
441}
442
443impl<N, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
444where
445 T: FullNodeTypes<
446 Types = N,
447 DB = DB,
448 Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, DB>>,
449 >,
450 N: Node<RethFullAdapter<DB, N>> + NodeTypesForProvider,
451 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
452 CB: NodeComponentsBuilder<T> + 'static,
453 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
454 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
455 + 'static,
456{
457 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
458 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
459
460 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
461 Box::pin(self.launch_node(target))
462 }
463}