1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_tree::{
15 chain::{ChainEvent, FromOrchestrator},
16 engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
17 launch::build_engine_orchestrator,
18 tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28 dirs::{ChainPath, DataDirPath},
29 exit::NodeExitFuture,
30 primitives::Head,
31};
32use reth_node_events::node;
33use reth_provider::{
34 providers::{BlockchainProvider, NodeTypesForProvider},
35 BlockNumReader, StorageSettingsCache,
36};
37use reth_tasks::TaskExecutor;
38use reth_tokio_util::EventSender;
39use reth_tracing::tracing::{debug, error, info};
40use reth_trie_db::ChangesetCache;
41use std::{future::Future, pin::Pin, sync::Arc};
42use tokio::sync::{mpsc::unbounded_channel, oneshot};
43use tokio_stream::wrappers::UnboundedReceiverStream;
44
45#[derive(Debug)]
47pub struct EngineNodeLauncher {
48 pub ctx: LaunchContext,
50
51 pub engine_tree_config: TreeConfig,
54}
55
56impl EngineNodeLauncher {
57 pub const fn new(
59 task_executor: TaskExecutor,
60 data_dir: ChainPath<DataDirPath>,
61 engine_tree_config: TreeConfig,
62 ) -> Self {
63 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
64 }
65
66 async fn launch_node<T, CB, AO>(
67 self,
68 target: NodeBuilderWithComponents<T, CB, AO>,
69 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
70 where
71 T: FullNodeTypes<
72 Types: NodeTypesForProvider,
73 Provider = BlockchainProvider<
74 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
75 >,
76 >,
77 CB: NodeComponentsBuilder<T>,
78 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
79 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
80 {
81 let Self { ctx, engine_tree_config } = self;
82 let NodeBuilderWithComponents {
83 adapter: NodeTypesAdapter { database },
84 rocksdb_provider,
85 components_builder,
86 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
87 config,
88 } = target;
89 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
90
91 let changeset_cache = ChangesetCache::new();
93
94 let ctx = ctx
96 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
97 .with_loaded_toml_config(config)?
99 .with_resolved_peers()?
101 .attach(database.clone())
103 .with_adjusted_configs()
105 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
107 .inspect(|_| {
108 info!(target: "reth::cli", "Database opened");
109 })
110 .with_prometheus_server().await?
111 .inspect(|this| {
112 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
113 })
114 .with_genesis()?
115 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
116 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
117 let settings = this.provider_factory().cached_storage_settings();
118 info!(target: "reth::cli", ?settings, "Loaded storage settings");
119 })
120 .with_metrics_task()
121 .with_blockchain_db::<T, _>(move |provider_factory| {
124 Ok(BlockchainProvider::new(provider_factory)?)
125 })?
126 .with_components(components_builder, on_component_initialized).await?;
127
128 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
130
131 let network_handle = ctx.components().network().clone();
133 let network_client = network_handle.fetch_client().await?;
134 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
135
136 let node_config = ctx.node_config();
137
138 network_handle.update_sync_state(SyncState::Syncing);
140
141 let max_block = ctx.max_block(network_client.clone()).await?;
142
143 let static_file_producer = ctx.static_file_producer();
144 let static_file_producer_events = static_file_producer.lock().events();
145 info!(target: "reth::cli", "StaticFileProducer initialized");
146
147 let consensus = Arc::new(ctx.components().consensus().clone());
148
149 let pipeline = build_networked_pipeline(
150 &ctx.toml_config().stages,
151 network_client.clone(),
152 consensus.clone(),
153 ctx.provider_factory().clone(),
154 ctx.task_executor(),
155 ctx.sync_metrics_tx(),
156 ctx.prune_config(),
157 max_block,
158 static_file_producer,
159 ctx.components().evm_config().clone(),
160 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
161 ctx.era_import_source(),
162 )?;
163
164 pipeline.move_to_static_files()?;
166
167 let pipeline_events = pipeline.events();
168
169 let mut pruner_builder = ctx.pruner_builder();
170 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
171 pruner_builder =
172 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
173 }
174 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
175 let pruner_events = pruner.events();
176 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
177
178 let event_sender = EventSender::default();
179
180 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
181
182 let jwt_secret = ctx.auth_jwt_secret()?;
184
185 let add_ons_ctx = AddOnsContext {
186 node: ctx.node_adapter().clone(),
187 config: ctx.node_config(),
188 beacon_engine_handle: beacon_engine_handle.clone(),
189 jwt_secret,
190 engine_events: event_sender.clone(),
191 };
192 let validator_builder = add_ons.engine_validator_builder();
193
194 let engine_validator = validator_builder
196 .clone()
197 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
198 .await?;
199
200 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
202 .maybe_skip_fcu(node_config.debug.skip_fcu)
203 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
204 .maybe_reorg(
205 ctx.blockchain_db().clone(),
206 ctx.components().evm_config().clone(),
207 || async {
208 let reorg_cache = ChangesetCache::new();
210 validator_builder
211 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
212 .await
213 },
214 node_config.debug.reorg_frequency,
215 node_config.debug.reorg_depth,
216 )
217 .await?
218 .maybe_store_messages(node_config.debug.engine_api_store.clone());
222
223 let engine_kind = if ctx.chain_spec().is_optimism() {
224 EngineApiKind::OpStack
225 } else {
226 EngineApiKind::Ethereum
227 };
228
229 let mut orchestrator = build_engine_orchestrator(
230 engine_kind,
231 consensus.clone(),
232 network_client.clone(),
233 Box::pin(consensus_engine_stream),
234 pipeline,
235 ctx.task_executor().clone(),
236 ctx.provider_factory().clone(),
237 ctx.blockchain_db().clone(),
238 pruner,
239 ctx.components().payload_builder_handle().clone(),
240 engine_validator,
241 engine_tree_config,
242 ctx.sync_metrics_tx(),
243 ctx.components().evm_config().clone(),
244 changeset_cache,
245 ctx.task_executor().clone(),
246 );
247
248 info!(target: "reth::cli", "Consensus engine initialized");
249
250 #[allow(clippy::needless_continue)]
251 let events = stream_select!(
252 event_sender.new_listener().map(Into::into),
253 pipeline_events.map(Into::into),
254 ctx.consensus_layer_events(),
255 pruner_events.map(Into::into),
256 static_file_producer_events.map(Into::into),
257 );
258
259 ctx.task_executor().spawn_critical_task(
260 "events task",
261 node::handle_events(
262 Some(Box::new(ctx.components().network().clone())),
263 Some(ctx.head().number),
264 events,
265 ),
266 );
267
268 let RpcHandle {
269 rpc_server_handles,
270 rpc_registry,
271 engine_events,
272 beacon_engine_handle,
273 engine_shutdown: _,
274 } = add_ons.launch_add_ons(add_ons_ctx).await?;
275
276 let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
278
279 let initial_target = ctx.initial_backfill_target()?;
281 let mut built_payloads = ctx
282 .components()
283 .payload_builder_handle()
284 .subscribe()
285 .await
286 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
287 .into_built_payload_stream()
288 .fuse();
289
290 let chainspec = ctx.chain_spec();
291 let provider = ctx.blockchain_db().clone();
292 let (exit, rx) = oneshot::channel();
293 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
294 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
295
296 info!(target: "reth::cli", "Starting consensus engine");
297 let consensus_engine = move |mut on_graceful_shutdown| async move {
298 if let Some(initial_target) = initial_target {
299 debug!(target: "reth::cli", %initial_target, "start backfill sync");
300 orchestrator.start_backfill_sync(initial_target);
302 } else if startup_sync_state_idle {
303 network_handle.update_sync_state(SyncState::Idle);
304 }
305
306 let mut res = Ok(());
307 let mut shutdown_rx = shutdown_rx.fuse();
308
309 loop {
313 tokio::select! {
314 event = orchestrator.next() => {
315 let Some(event) = event else { break };
316 debug!(target: "reth::cli", "Event: {event}");
317 match event {
318 ChainEvent::BackfillSyncFinished => {
319 if terminate_after_backfill {
320 debug!(target: "reth::cli", "Terminating after initial backfill");
321 break
322 }
323 if startup_sync_state_idle {
324 network_handle.update_sync_state(SyncState::Idle);
325 }
326 }
327 ChainEvent::BackfillSyncStarted => {
328 network_handle.update_sync_state(SyncState::Syncing);
329 }
330 ChainEvent::FatalError => {
331 error!(target: "reth::cli", "Fatal error in consensus engine");
332 res = Err(eyre::eyre!("Fatal error in consensus engine"));
333 break
334 }
335 ChainEvent::Handler(ev) => {
336 if let Some(head) = ev.canonical_header() {
337 network_handle.update_sync_state(SyncState::Idle);
339 let head_block = Head {
340 number: head.number(),
341 hash: head.hash(),
342 difficulty: head.difficulty(),
343 timestamp: head.timestamp(),
344 total_difficulty: chainspec.final_paris_total_difficulty()
345 .filter(|_| chainspec.is_paris_active_at_block(head.number()))
346 .unwrap_or_default(),
347 };
348 network_handle.update_status(head_block);
349
350 let updated = BlockRangeUpdate {
351 earliest: provider.earliest_block_number().unwrap_or_default(),
352 latest: head.number(),
353 latest_hash: head.hash(),
354 };
355 network_handle.update_block_range(updated);
356 }
357 event_sender.notify(ev);
358 }
359 }
360 }
361 payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
362 if let Some(executed_block) = payload.executed_block() {
363 debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
364 orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
365 }
366 }
367 shutdown_req = &mut shutdown_rx => {
368 if let Ok(req) = shutdown_req {
369 debug!(target: "reth::cli", "received engine shutdown request");
370 orchestrator.handler_mut().handler_mut().on_event(
371 FromOrchestrator::Terminate { tx: req.done_tx }.into()
372 );
373 }
374 }
375 _guard = &mut on_graceful_shutdown => {
376 debug!(target: "reth::cli", "shutdown signal received, terminating engine");
380 let (done_tx, done_rx) = oneshot::channel();
381 orchestrator.handler_mut().handler_mut().on_event(
382 FromOrchestrator::Terminate { tx: done_tx }.into()
383 );
384 let _ = done_rx.await;
385 break;
386 }
387 }
388 }
389
390 let _ = exit.send(res);
391 };
392 ctx.task_executor()
393 .spawn_critical_with_graceful_shutdown_signal("consensus engine", consensus_engine);
394
395 let engine_events_for_ethstats = engine_events.new_listener();
396
397 let full_node = FullNode {
398 evm_config: ctx.components().evm_config().clone(),
399 pool: ctx.components().pool().clone(),
400 network: ctx.components().network().clone(),
401 provider: ctx.node_adapter().provider.clone(),
402 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
403 task_executor: ctx.task_executor().clone(),
404 config: ctx.node_config().clone(),
405 data_dir: ctx.data_dir().clone(),
406 add_ons_handle: RpcHandle {
407 rpc_server_handles,
408 rpc_registry,
409 engine_events,
410 beacon_engine_handle,
411 engine_shutdown,
412 },
413 };
414 on_node_started.on_event(FullNode::clone(&full_node))?;
416
417 ctx.spawn_ethstats(engine_events_for_ethstats).await?;
418
419 let handle = NodeHandle {
420 node_exit_future: NodeExitFuture::new(async { rx.await? }),
421 node: full_node,
422 };
423
424 Ok(handle)
425 }
426}
427
428impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
429where
430 T: FullNodeTypes<
431 Types: NodeTypesForProvider,
432 Provider = BlockchainProvider<
433 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
434 >,
435 >,
436 CB: NodeComponentsBuilder<T> + 'static,
437 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
438 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
439 + 'static,
440{
441 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
442 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
443
444 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
445 Box::pin(self.launch_node(target))
446 }
447}