1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, FutureExt, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_service::service::{ChainEvent, EngineService};
15use reth_engine_tree::{
16 chain::FromOrchestrator,
17 engine::{EngineApiRequest, EngineRequestHandler},
18 tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28 dirs::{ChainPath, DataDirPath},
29 exit::NodeExitFuture,
30 primitives::Head,
31};
32use reth_node_events::node;
33use reth_provider::{
34 providers::{BlockchainProvider, NodeTypesForProvider},
35 BlockNumReader, MetadataProvider,
36};
37use reth_tasks::TaskExecutor;
38use reth_tokio_util::EventSender;
39use reth_tracing::tracing::{debug, error, info};
40use std::{future::Future, pin::Pin, sync::Arc};
41use tokio::sync::{mpsc::unbounded_channel, oneshot};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43use tracing::warn;
44
45#[derive(Debug)]
47pub struct EngineNodeLauncher {
48 pub ctx: LaunchContext,
50
51 pub engine_tree_config: TreeConfig,
54}
55
56impl EngineNodeLauncher {
57 pub const fn new(
59 task_executor: TaskExecutor,
60 data_dir: ChainPath<DataDirPath>,
61 engine_tree_config: TreeConfig,
62 ) -> Self {
63 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
64 }
65
66 async fn launch_node<T, CB, AO>(
67 self,
68 target: NodeBuilderWithComponents<T, CB, AO>,
69 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
70 where
71 T: FullNodeTypes<
72 Types: NodeTypesForProvider,
73 Provider = BlockchainProvider<
74 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
75 >,
76 >,
77 CB: NodeComponentsBuilder<T>,
78 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
79 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
80 {
81 let Self { ctx, engine_tree_config } = self;
82 let NodeBuilderWithComponents {
83 adapter: NodeTypesAdapter { database },
84 components_builder,
85 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
86 config,
87 } = target;
88 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
89
90 let ctx = ctx
92 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
93 .with_loaded_toml_config(config)?
95 .with_resolved_peers()?
97 .attach(database.clone())
99 .with_adjusted_configs()
101 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
103 .inspect(|ctx| {
104 info!(target: "reth::cli", "Database opened");
105 match ctx.provider_factory().storage_settings() {
106 Ok(settings) => {
107 info!(
108 target: "reth::cli",
109 ?settings,
110 "Storage settings"
111 );
112 },
113 Err(err) => {
114 warn!(
115 target: "reth::cli",
116 ?err,
117 "Failed to get storage settings"
118 );
119 },
120 }
121 })
122 .with_prometheus_server().await?
123 .inspect(|this| {
124 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
125 })
126 .with_genesis()?
127 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
128 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
129 })
130 .with_metrics_task()
131 .with_blockchain_db::<T, _>(move |provider_factory| {
134 Ok(BlockchainProvider::new(provider_factory)?)
135 })?
136 .with_components(components_builder, on_component_initialized).await?;
137
138 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
140
141 let network_handle = ctx.components().network().clone();
143 let network_client = network_handle.fetch_client().await?;
144 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
145
146 let node_config = ctx.node_config();
147
148 network_handle.update_sync_state(SyncState::Syncing);
150
151 let max_block = ctx.max_block(network_client.clone()).await?;
152
153 let static_file_producer = ctx.static_file_producer();
154 let static_file_producer_events = static_file_producer.lock().events();
155 info!(target: "reth::cli", "StaticFileProducer initialized");
156
157 let consensus = Arc::new(ctx.components().consensus().clone());
158
159 let pipeline = build_networked_pipeline(
160 &ctx.toml_config().stages,
161 network_client.clone(),
162 consensus.clone(),
163 ctx.provider_factory().clone(),
164 ctx.task_executor(),
165 ctx.sync_metrics_tx(),
166 ctx.prune_config(),
167 max_block,
168 static_file_producer,
169 ctx.components().evm_config().clone(),
170 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
171 ctx.era_import_source(),
172 )?;
173
174 pipeline.move_to_static_files()?;
176
177 let pipeline_events = pipeline.events();
178
179 let mut pruner_builder = ctx.pruner_builder();
180 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
181 pruner_builder =
182 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
183 }
184 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
185 let pruner_events = pruner.events();
186 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
187
188 let event_sender = EventSender::default();
189
190 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
191
192 let jwt_secret = ctx.auth_jwt_secret()?;
194
195 let add_ons_ctx = AddOnsContext {
196 node: ctx.node_adapter().clone(),
197 config: ctx.node_config(),
198 beacon_engine_handle: beacon_engine_handle.clone(),
199 jwt_secret,
200 engine_events: event_sender.clone(),
201 };
202 let validator_builder = add_ons.engine_validator_builder();
203
204 let engine_validator = validator_builder
206 .clone()
207 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
208 .await?;
209
210 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
212 .maybe_skip_fcu(node_config.debug.skip_fcu)
213 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
214 .maybe_reorg(
215 ctx.blockchain_db().clone(),
216 ctx.components().evm_config().clone(),
217 || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
218 node_config.debug.reorg_frequency,
219 node_config.debug.reorg_depth,
220 )
221 .await?
222 .maybe_store_messages(node_config.debug.engine_api_store.clone());
226
227 let mut engine_service = EngineService::new(
228 consensus.clone(),
229 ctx.chain_spec(),
230 network_client.clone(),
231 Box::pin(consensus_engine_stream),
232 pipeline,
233 Box::new(ctx.task_executor().clone()),
234 ctx.provider_factory().clone(),
235 ctx.blockchain_db().clone(),
236 pruner,
237 ctx.components().payload_builder_handle().clone(),
238 engine_validator,
239 engine_tree_config,
240 ctx.sync_metrics_tx(),
241 ctx.components().evm_config().clone(),
242 );
243
244 info!(target: "reth::cli", "Consensus engine initialized");
245
246 #[allow(clippy::needless_continue)]
247 let events = stream_select!(
248 event_sender.new_listener().map(Into::into),
249 pipeline_events.map(Into::into),
250 ctx.consensus_layer_events(),
251 pruner_events.map(Into::into),
252 static_file_producer_events.map(Into::into),
253 );
254
255 ctx.task_executor().spawn_critical(
256 "events task",
257 Box::pin(node::handle_events(
258 Some(Box::new(ctx.components().network().clone())),
259 Some(ctx.head().number),
260 events,
261 )),
262 );
263
264 let RpcHandle {
265 rpc_server_handles,
266 rpc_registry,
267 engine_events,
268 beacon_engine_handle,
269 engine_shutdown: _,
270 } = add_ons.launch_add_ons(add_ons_ctx).await?;
271
272 let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
274
275 let initial_target = ctx.initial_backfill_target()?;
277 let mut built_payloads = ctx
278 .components()
279 .payload_builder_handle()
280 .subscribe()
281 .await
282 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
283 .into_built_payload_stream()
284 .fuse();
285
286 let chainspec = ctx.chain_spec();
287 let provider = ctx.blockchain_db().clone();
288 let (exit, rx) = oneshot::channel();
289 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
290 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
291
292 info!(target: "reth::cli", "Starting consensus engine");
293 let consensus_engine = async move {
294 if let Some(initial_target) = initial_target {
295 debug!(target: "reth::cli", %initial_target, "start backfill sync");
296 engine_service.orchestrator_mut().start_backfill_sync(initial_target);
298 } else if startup_sync_state_idle {
299 network_handle.update_sync_state(SyncState::Idle);
300 }
301
302 let mut res = Ok(());
303 let mut shutdown_rx = shutdown_rx.fuse();
304
305 loop {
309 tokio::select! {
310 shutdown_req = &mut shutdown_rx => {
311 if let Ok(req) = shutdown_req {
312 debug!(target: "reth::cli", "received engine shutdown request");
313 engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
314 FromOrchestrator::Terminate { tx: req.done_tx }.into()
315 );
316 }
317 }
318 payload = built_payloads.select_next_some() => {
319 if let Some(executed_block) = payload.executed_block() {
320 debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
321 engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
322 }
323 }
324 event = engine_service.next() => {
325 let Some(event) = event else { break };
326 debug!(target: "reth::cli", "Event: {event}");
327 match event {
328 ChainEvent::BackfillSyncFinished => {
329 if terminate_after_backfill {
330 debug!(target: "reth::cli", "Terminating after initial backfill");
331 break
332 }
333 if startup_sync_state_idle {
334 network_handle.update_sync_state(SyncState::Idle);
335 }
336 }
337 ChainEvent::BackfillSyncStarted => {
338 network_handle.update_sync_state(SyncState::Syncing);
339 }
340 ChainEvent::FatalError => {
341 error!(target: "reth::cli", "Fatal error in consensus engine");
342 res = Err(eyre::eyre!("Fatal error in consensus engine"));
343 break
344 }
345 ChainEvent::Handler(ev) => {
346 if let Some(head) = ev.canonical_header() {
347 network_handle.update_sync_state(SyncState::Idle);
349 let head_block = Head {
350 number: head.number(),
351 hash: head.hash(),
352 difficulty: head.difficulty(),
353 timestamp: head.timestamp(),
354 total_difficulty: chainspec.final_paris_total_difficulty()
355 .filter(|_| chainspec.is_paris_active_at_block(head.number()))
356 .unwrap_or_default(),
357 };
358 network_handle.update_status(head_block);
359
360 let updated = BlockRangeUpdate {
361 earliest: provider.earliest_block_number().unwrap_or_default(),
362 latest: head.number(),
363 latest_hash: head.hash(),
364 };
365 network_handle.update_block_range(updated);
366 }
367 event_sender.notify(ev);
368 }
369 }
370 }
371 }
372 }
373
374 let _ = exit.send(res);
375 };
376 ctx.task_executor().spawn_critical("consensus engine", Box::pin(consensus_engine));
377
378 let engine_events_for_ethstats = engine_events.new_listener();
379
380 let full_node = FullNode {
381 evm_config: ctx.components().evm_config().clone(),
382 pool: ctx.components().pool().clone(),
383 network: ctx.components().network().clone(),
384 provider: ctx.node_adapter().provider.clone(),
385 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
386 task_executor: ctx.task_executor().clone(),
387 config: ctx.node_config().clone(),
388 data_dir: ctx.data_dir().clone(),
389 add_ons_handle: RpcHandle {
390 rpc_server_handles,
391 rpc_registry,
392 engine_events,
393 beacon_engine_handle,
394 engine_shutdown,
395 },
396 };
397 on_node_started.on_event(FullNode::clone(&full_node))?;
399
400 ctx.spawn_ethstats(engine_events_for_ethstats).await?;
401
402 let handle = NodeHandle {
403 node_exit_future: NodeExitFuture::new(
404 async { rx.await? },
405 full_node.config.debug.terminate,
406 ),
407 node: full_node,
408 };
409
410 Ok(handle)
411 }
412}
413
414impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
415where
416 T: FullNodeTypes<
417 Types: NodeTypesForProvider,
418 Provider = BlockchainProvider<
419 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
420 >,
421 >,
422 CB: NodeComponentsBuilder<T> + 'static,
423 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
424 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
425 + 'static,
426{
427 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
428 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
429
430 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
431 Box::pin(self.launch_node(target))
432 }
433}