1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_tree::{
15 chain::{ChainEvent, FromOrchestrator},
16 engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
17 launch::build_engine_orchestrator,
18 tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28 args::PruneConfigKind,
29 dirs::{ChainPath, DataDirPath},
30 exit::NodeExitFuture,
31 primitives::Head,
32};
33use reth_node_events::node;
34use reth_provider::{
35 providers::{BlockchainProvider, NodeTypesForProvider},
36 BlockNumReader, StorageSettingsCache,
37};
38use reth_tasks::TaskExecutor;
39use reth_tokio_util::EventSender;
40use reth_tracing::tracing::{debug, error, info};
41use reth_trie_db::ChangesetCache;
42use std::{future::Future, pin::Pin, sync::Arc};
43use tokio::sync::{mpsc::unbounded_channel, oneshot};
44use tokio_stream::wrappers::UnboundedReceiverStream;
45
46#[derive(Debug)]
48pub struct EngineNodeLauncher {
49 pub ctx: LaunchContext,
51
52 pub engine_tree_config: TreeConfig,
55}
56
57impl EngineNodeLauncher {
58 pub const fn new(
60 task_executor: TaskExecutor,
61 data_dir: ChainPath<DataDirPath>,
62 engine_tree_config: TreeConfig,
63 ) -> Self {
64 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
65 }
66
67 async fn launch_node<T, CB, AO>(
68 self,
69 target: NodeBuilderWithComponents<T, CB, AO>,
70 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
71 where
72 T: FullNodeTypes<
73 Types: NodeTypesForProvider,
74 Provider = BlockchainProvider<
75 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
76 >,
77 >,
78 CB: NodeComponentsBuilder<T>,
79 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
80 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
81 {
82 let Self { ctx, engine_tree_config } = self;
83 let NodeBuilderWithComponents {
84 adapter: NodeTypesAdapter { database },
85 rocksdb_provider,
86 components_builder,
87 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
88 config,
89 } = target;
90 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
91
92 let changeset_cache = ChangesetCache::new();
94
95 let ctx = ctx
97 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
98 .with_loaded_toml_config(config)?
100 .with_resolved_peers()?
102 .attach(database.clone())
104 .with_adjusted_configs()
106 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
108 .inspect(|_| {
109 info!(target: "reth::cli", "Database opened");
110 })
111 .with_prometheus_server().await?
112 .inspect(|this| {
113 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
114 })
115 .with_genesis()?
116 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
117 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
118 let settings = this.provider_factory().cached_storage_settings();
119 let pruning_mode =
120 PruneConfigKind::from_config(&this.prune_config(), this.chain_spec().as_ref()).as_str();
121 info!(target: "reth::cli", ?settings, ?pruning_mode, "Loaded storage settings");
122 })
123 .with_metrics_task()
124 .with_blockchain_db::<T, _>(move |provider_factory| {
127 Ok(BlockchainProvider::new(provider_factory)?)
128 })?
129 .with_components(components_builder, on_component_initialized).await?;
130
131 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
133
134 let network_handle = ctx.components().network().clone();
136 let network_client = network_handle.fetch_client().await?;
137 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
138
139 let node_config = ctx.node_config();
140
141 network_handle.update_sync_state(SyncState::Syncing);
143
144 let max_block = ctx.max_block(network_client.clone()).await?;
145
146 let static_file_producer = ctx.static_file_producer();
147 let static_file_producer_events = static_file_producer.lock().events();
148 info!(target: "reth::cli", "StaticFileProducer initialized");
149
150 let consensus = Arc::new(ctx.components().consensus().clone());
151
152 let pipeline = build_networked_pipeline(
153 &ctx.toml_config().stages,
154 network_client.clone(),
155 consensus.clone(),
156 ctx.provider_factory().clone(),
157 ctx.task_executor(),
158 ctx.sync_metrics_tx(),
159 ctx.prune_config(),
160 max_block,
161 static_file_producer,
162 ctx.components().evm_config().clone(),
163 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
164 ctx.era_import_source(),
165 )?;
166
167 pipeline.move_to_static_files()?;
169
170 let pipeline_events = pipeline.events();
171
172 let mut pruner_builder = ctx.pruner_builder();
173 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
174 pruner_builder =
175 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
176 }
177 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
178 let pruner_events = pruner.events();
179 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
180
181 let event_sender = EventSender::default();
182
183 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
184
185 let jwt_secret = ctx.auth_jwt_secret()?;
187
188 let add_ons_ctx = AddOnsContext {
189 node: ctx.node_adapter().clone(),
190 config: ctx.node_config(),
191 beacon_engine_handle: beacon_engine_handle.clone(),
192 jwt_secret,
193 engine_events: event_sender.clone(),
194 };
195 let validator_builder = add_ons.engine_validator_builder();
196
197 let engine_validator = validator_builder
199 .clone()
200 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
201 .await?;
202
203 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
205 .maybe_skip_fcu(node_config.debug.skip_fcu)
206 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
207 .maybe_reorg(
208 ctx.blockchain_db().clone(),
209 ctx.components().evm_config().clone(),
210 || async {
211 validator_builder
212 .build_tree_validator(
213 &add_ons_ctx,
214 engine_tree_config.clone(),
215 changeset_cache.clone(),
216 )
217 .await
218 },
219 node_config.debug.reorg_frequency,
220 node_config.debug.reorg_depth,
221 )
222 .await?
223 .maybe_store_messages(node_config.debug.engine_api_store.clone());
227
228 let engine_kind = if ctx.chain_spec().is_optimism() {
229 EngineApiKind::OpStack
230 } else {
231 EngineApiKind::Ethereum
232 };
233
234 let mut orchestrator = build_engine_orchestrator(
235 engine_kind,
236 consensus.clone(),
237 network_client.clone(),
238 Box::pin(consensus_engine_stream),
239 pipeline,
240 ctx.task_executor().clone(),
241 ctx.provider_factory().clone(),
242 ctx.blockchain_db().clone(),
243 pruner,
244 ctx.components().payload_builder_handle().clone(),
245 engine_validator,
246 engine_tree_config,
247 ctx.sync_metrics_tx(),
248 ctx.components().evm_config().clone(),
249 changeset_cache,
250 ctx.task_executor().clone(),
251 );
252
253 info!(target: "reth::cli", "Consensus engine initialized");
254
255 #[expect(clippy::needless_continue)]
256 let events = stream_select!(
257 event_sender.new_listener().map(Into::into),
258 pipeline_events.map(Into::into),
259 ctx.consensus_layer_events(),
260 pruner_events.map(Into::into),
261 static_file_producer_events.map(Into::into),
262 );
263
264 ctx.task_executor().spawn_critical_task(
265 "events task",
266 node::handle_events(
267 Some(Box::new(ctx.components().network().clone())),
268 Some(ctx.head().number),
269 events,
270 ),
271 );
272
273 let RpcHandle {
274 rpc_server_handles,
275 rpc_registry,
276 engine_events,
277 beacon_engine_handle,
278 engine_shutdown: _,
279 } = add_ons.launch_add_ons(add_ons_ctx).await?;
280
281 let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
283
284 let initial_target = ctx.initial_backfill_target()?;
286 let mut built_payloads = ctx
287 .components()
288 .payload_builder_handle()
289 .subscribe()
290 .await
291 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
292 .into_built_payload_stream()
293 .fuse();
294
295 let chainspec = ctx.chain_spec();
296 let provider = ctx.blockchain_db().clone();
297 let (exit, rx) = oneshot::channel();
298 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
299 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
300
301 info!(target: "reth::cli", "Starting consensus engine");
302 let consensus_engine = move |mut on_graceful_shutdown| async move {
303 if let Some(initial_target) = initial_target {
304 debug!(target: "reth::cli", %initial_target, "start backfill sync");
305 orchestrator.start_backfill_sync(initial_target);
307 } else if startup_sync_state_idle {
308 network_handle.update_sync_state(SyncState::Idle);
309 }
310
311 let mut res = Ok(());
312 let mut shutdown_rx = shutdown_rx.fuse();
313
314 loop {
318 tokio::select! {
319 event = orchestrator.next() => {
320 let Some(event) = event else { break };
321 debug!(target: "reth::cli", "Event: {event}");
322 match event {
323 ChainEvent::BackfillSyncFinished => {
324 if terminate_after_backfill {
325 debug!(target: "reth::cli", "Terminating after initial backfill");
326 break
327 }
328 if startup_sync_state_idle {
329 network_handle.update_sync_state(SyncState::Idle);
330 }
331 }
332 ChainEvent::BackfillSyncStarted => {
333 network_handle.update_sync_state(SyncState::Syncing);
334 }
335 ChainEvent::FatalError => {
336 error!(target: "reth::cli", "Fatal error in consensus engine");
337 res = Err(eyre::eyre!("Fatal error in consensus engine"));
338 break
339 }
340 ChainEvent::Handler(ev) => {
341 if let Some(head) = ev.canonical_header() {
342 network_handle.update_sync_state(SyncState::Idle);
344 let head_block = Head {
345 number: head.number(),
346 hash: head.hash(),
347 difficulty: head.difficulty(),
348 timestamp: head.timestamp(),
349 total_difficulty: chainspec.final_paris_total_difficulty()
350 .filter(|_| chainspec.is_paris_active_at_block(head.number()))
351 .unwrap_or_default(),
352 };
353 network_handle.update_status(head_block);
354
355 let updated = BlockRangeUpdate {
356 earliest: provider.earliest_block_number().unwrap_or_default(),
357 latest: head.number(),
358 latest_hash: head.hash(),
359 };
360 network_handle.update_block_range(updated);
361 }
362 event_sender.notify(ev);
363 }
364 }
365 }
366 payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
367 if let Some(executed_block) = payload.executed_block() {
368 debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
369 orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
370 }
371 }
372 shutdown_req = &mut shutdown_rx => {
373 if let Ok(req) = shutdown_req {
374 debug!(target: "reth::cli", "received engine shutdown request");
375 orchestrator.handler_mut().handler_mut().on_event(
376 FromOrchestrator::Terminate { tx: req.done_tx }.into()
377 );
378 }
379 }
380 _guard = &mut on_graceful_shutdown => {
381 debug!(target: "reth::cli", "shutdown signal received, terminating engine");
385 let (done_tx, done_rx) = oneshot::channel();
386 orchestrator.handler_mut().handler_mut().on_event(
387 FromOrchestrator::Terminate { tx: done_tx }.into()
388 );
389 let _ = done_rx.await;
390 break;
391 }
392 }
393 }
394
395 let _ = exit.send(res);
396 };
397 ctx.task_executor()
398 .spawn_critical_with_graceful_shutdown_signal("consensus engine", consensus_engine);
399
400 let engine_events_for_ethstats = engine_events.new_listener();
401
402 let full_node = FullNode {
403 evm_config: ctx.components().evm_config().clone(),
404 pool: ctx.components().pool().clone(),
405 network: ctx.components().network().clone(),
406 provider: ctx.node_adapter().provider.clone(),
407 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
408 task_executor: ctx.task_executor().clone(),
409 config: ctx.node_config().clone(),
410 data_dir: ctx.data_dir().clone(),
411 add_ons_handle: RpcHandle {
412 rpc_server_handles,
413 rpc_registry,
414 engine_events,
415 beacon_engine_handle,
416 engine_shutdown,
417 },
418 };
419 on_node_started.on_event(FullNode::clone(&full_node))?;
421
422 ctx.spawn_ethstats(engine_events_for_ethstats).await?;
423
424 let handle = NodeHandle {
425 node_exit_future: NodeExitFuture::new(async { rx.await? }),
426 node: full_node,
427 };
428
429 Ok(handle)
430 }
431}
432
433impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
434where
435 T: FullNodeTypes<
436 Types: NodeTypesForProvider,
437 Provider = BlockchainProvider<
438 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
439 >,
440 >,
441 CB: NodeComponentsBuilder<T> + 'static,
442 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
443 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
444 + 'static,
445{
446 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
447 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
448
449 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
450 Box::pin(self.launch_node(target))
451 }
452}