1use alloy_consensus::BlockHeader;
4use futures::{future::Either, stream, stream_select, StreamExt};
5use reth_chainspec::{EthChainSpec, EthereumHardforks};
6use reth_db_api::{database_metrics::DatabaseMetrics, Database};
7use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
8use reth_engine_service::service::{ChainEvent, EngineService};
9use reth_engine_tree::{
10 engine::{EngineApiRequest, EngineRequestHandler},
11 tree::TreeConfig,
12};
13use reth_engine_util::EngineMessageStreamExt;
14use reth_exex::ExExManagerHandle;
15use reth_network::{NetworkSyncUpdater, SyncState};
16use reth_network_api::BlockDownloaderProvider;
17use reth_node_api::{
18 BeaconConsensusEngineHandle, BuiltPayload, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
19 PayloadAttributesBuilder, PayloadTypes,
20};
21use reth_node_core::{
22 dirs::{ChainPath, DataDirPath},
23 exit::NodeExitFuture,
24 primitives::Head,
25};
26use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
27use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
28use reth_tasks::TaskExecutor;
29use reth_tokio_util::EventSender;
30use reth_tracing::tracing::{debug, error, info};
31use std::sync::Arc;
32use tokio::sync::{mpsc::unbounded_channel, oneshot};
33use tokio_stream::wrappers::UnboundedReceiverStream;
34
35use crate::{
36 common::{Attached, LaunchContextWith, WithConfigs},
37 hooks::NodeHooks,
38 rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
39 setup::build_networked_pipeline,
40 AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
41 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
42};
43
44#[derive(Debug)]
46pub struct EngineNodeLauncher {
47 pub ctx: LaunchContext,
49
50 pub engine_tree_config: TreeConfig,
53}
54
55impl EngineNodeLauncher {
56 pub const fn new(
58 task_executor: TaskExecutor,
59 data_dir: ChainPath<DataDirPath>,
60 engine_tree_config: TreeConfig,
61 ) -> Self {
62 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
63 }
64}
65
66impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
67where
68 Types: NodeTypesForProvider + NodeTypes,
69 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
70 T: FullNodeTypes<
71 Types = Types,
72 DB = DB,
73 Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
74 >,
75 CB: NodeComponentsBuilder<T>,
76 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
77 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
78 LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
79 <<Types as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
80 >,
81{
82 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
83
84 async fn launch_node(
85 self,
86 target: NodeBuilderWithComponents<T, CB, AO>,
87 ) -> eyre::Result<Self::Node> {
88 let Self { ctx, engine_tree_config } = self;
89 let NodeBuilderWithComponents {
90 adapter: NodeTypesAdapter { database },
91 components_builder,
92 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
93 config,
94 } = target;
95 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
96
97 let ctx = ctx
99 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
100 .with_loaded_toml_config(config)?
102 .with_resolved_peers()?
104 .attach(database.clone())
106 .with_adjusted_configs()
108 .with_provider_factory().await?
110 .inspect(|_| {
111 info!(target: "reth::cli", "Database opened");
112 })
113 .with_prometheus_server().await?
114 .inspect(|this| {
115 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
116 })
117 .with_genesis()?
118 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
119 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
120 })
121 .with_metrics_task()
122 .with_blockchain_db::<T, _>(move |provider_factory| {
125 Ok(BlockchainProvider::new(provider_factory)?)
126 })?
127 .with_components(components_builder, on_component_initialized).await?;
128
129 let exex_manager_handle = ExExLauncher::new(
131 ctx.head(),
132 ctx.node_adapter().clone(),
133 installed_exex,
134 ctx.configs().clone(),
135 )
136 .launch()
137 .await?;
138
139 let network_client = ctx.components().network().fetch_client().await?;
141 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
142
143 let node_config = ctx.node_config();
144
145 let max_block = ctx.max_block(network_client.clone()).await?;
146
147 let static_file_producer = ctx.static_file_producer();
148 let static_file_producer_events = static_file_producer.lock().events();
149 info!(target: "reth::cli", "StaticFileProducer initialized");
150
151 let consensus = Arc::new(ctx.components().consensus().clone());
152
153 let pipeline_exex_handle =
155 exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
156 let pipeline = build_networked_pipeline(
157 &ctx.toml_config().stages,
158 network_client.clone(),
159 consensus.clone(),
160 ctx.provider_factory().clone(),
161 ctx.task_executor(),
162 ctx.sync_metrics_tx(),
163 ctx.prune_config(),
164 max_block,
165 static_file_producer,
166 ctx.components().block_executor().clone(),
167 pipeline_exex_handle,
168 )?;
169
170 pipeline.move_to_static_files()?;
172
173 let pipeline_events = pipeline.events();
174
175 let mut pruner_builder = ctx.pruner_builder();
176 if let Some(exex_manager_handle) = &exex_manager_handle {
177 pruner_builder =
178 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
179 }
180 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
181 let pruner_events = pruner.events();
182 info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
183
184 let event_sender = EventSender::default();
185
186 let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
187
188 let jwt_secret = ctx.auth_jwt_secret()?;
190
191 let add_ons_ctx = AddOnsContext {
192 node: ctx.node_adapter().clone(),
193 config: ctx.node_config(),
194 beacon_engine_handle: beacon_engine_handle.clone(),
195 jwt_secret,
196 engine_events: event_sender.clone(),
197 };
198 let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
199
200 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
201 .maybe_skip_fcu(node_config.debug.skip_fcu)
202 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
203 .maybe_reorg(
204 ctx.blockchain_db().clone(),
205 ctx.components().evm_config().clone(),
206 engine_payload_validator.clone(),
207 node_config.debug.reorg_frequency,
208 node_config.debug.reorg_depth,
209 )
210 .maybe_store_messages(node_config.debug.engine_api_store.clone());
214
215 let mut engine_service = if ctx.is_dev() {
216 let eth_service = LocalEngineService::new(
217 consensus.clone(),
218 ctx.components().block_executor().clone(),
219 ctx.provider_factory().clone(),
220 ctx.blockchain_db().clone(),
221 pruner,
222 ctx.components().payload_builder_handle().clone(),
223 engine_payload_validator,
224 engine_tree_config,
225 ctx.invalid_block_hook()?,
226 ctx.sync_metrics_tx(),
227 consensus_engine_tx.clone(),
228 Box::pin(consensus_engine_stream),
229 ctx.dev_mining_mode(ctx.components().pool()),
230 LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
231 ctx.components().evm_config().clone(),
232 );
233
234 Either::Left(eth_service)
235 } else {
236 let eth_service = EngineService::new(
237 consensus.clone(),
238 ctx.components().block_executor().clone(),
239 ctx.chain_spec(),
240 network_client.clone(),
241 Box::pin(consensus_engine_stream),
242 pipeline,
243 Box::new(ctx.task_executor().clone()),
244 ctx.provider_factory().clone(),
245 ctx.blockchain_db().clone(),
246 pruner,
247 ctx.components().payload_builder_handle().clone(),
248 engine_payload_validator,
249 engine_tree_config,
250 ctx.invalid_block_hook()?,
251 ctx.sync_metrics_tx(),
252 ctx.components().evm_config().clone(),
253 );
254
255 Either::Right(eth_service)
256 };
257
258 info!(target: "reth::cli", "Consensus engine initialized");
259
260 let events = stream_select!(
261 event_sender.new_listener().map(Into::into),
262 pipeline_events.map(Into::into),
263 if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
264 Either::Left(
265 ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
266 .map(Into::into),
267 )
268 } else {
269 Either::Right(stream::empty())
270 },
271 pruner_events.map(Into::into),
272 static_file_producer_events.map(Into::into),
273 );
274
275 ctx.task_executor().spawn_critical(
276 "events task",
277 node::handle_events(
278 Some(Box::new(ctx.components().network().clone())),
279 Some(ctx.head().number),
280 events,
281 ),
282 );
283
284 let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
285 add_ons.launch_add_ons(add_ons_ctx).await?;
286
287 let initial_target = ctx.initial_backfill_target()?;
289 let network_handle = ctx.components().network().clone();
290 let mut built_payloads = ctx
291 .components()
292 .payload_builder_handle()
293 .subscribe()
294 .await
295 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
296 .into_built_payload_stream()
297 .fuse();
298 let chainspec = ctx.chain_spec();
299 let (exit, rx) = oneshot::channel();
300 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
301
302 info!(target: "reth::cli", "Starting consensus engine");
303 ctx.task_executor().spawn_critical("consensus engine", async move {
304 if let Some(initial_target) = initial_target {
305 debug!(target: "reth::cli", %initial_target, "start backfill sync");
306 if let Either::Right(eth_service) = &mut engine_service {
307 eth_service.orchestrator_mut().start_backfill_sync(initial_target);
308 }
309 }
310
311 let mut res = Ok(());
312
313 loop {
315 tokio::select! {
316 payload = built_payloads.select_next_some() => {
317 if let Some(executed_block) = payload.executed_block() {
318 debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(), "inserting built payload");
319 if let Either::Right(eth_service) = &mut engine_service {
320 eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
321 }
322 }
323 }
324 event = engine_service.next() => {
325 let Some(event) = event else { break };
326 debug!(target: "reth::cli", "Event: {event}");
327 match event {
328 ChainEvent::BackfillSyncFinished => {
329 if terminate_after_backfill {
330 debug!(target: "reth::cli", "Terminating after initial backfill");
331 break
332 }
333
334 network_handle.update_sync_state(SyncState::Idle);
335 }
336 ChainEvent::BackfillSyncStarted => {
337 network_handle.update_sync_state(SyncState::Syncing);
338 }
339 ChainEvent::FatalError => {
340 error!(target: "reth::cli", "Fatal error in consensus engine");
341 res = Err(eyre::eyre!("Fatal error in consensus engine"));
342 break
343 }
344 ChainEvent::Handler(ev) => {
345 if let Some(head) = ev.canonical_header() {
346 let head_block = Head {
347 number: head.number(),
348 hash: head.hash(),
349 difficulty: head.difficulty(),
350 timestamp: head.timestamp(),
351 total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
352 };
353 network_handle.update_status(head_block);
354 }
355 event_sender.notify(ev);
356 }
357 }
358 }
359 }
360 }
361
362 let _ = exit.send(res);
363 });
364
365 let full_node = FullNode {
366 evm_config: ctx.components().evm_config().clone(),
367 block_executor: ctx.components().block_executor().clone(),
368 pool: ctx.components().pool().clone(),
369 network: ctx.components().network().clone(),
370 provider: ctx.node_adapter().provider.clone(),
371 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
372 task_executor: ctx.task_executor().clone(),
373 config: ctx.node_config().clone(),
374 data_dir: ctx.data_dir().clone(),
375 add_ons_handle: RpcHandle {
376 rpc_server_handles,
377 rpc_registry,
378 engine_events,
379 beacon_engine_handle,
380 },
381 };
382 on_node_started.on_event(FullNode::clone(&full_node))?;
384
385 let handle = NodeHandle {
386 node_exit_future: NodeExitFuture::new(
387 async { rx.await? },
388 full_node.config.debug.terminate,
389 ),
390 node: full_node,
391 };
392
393 Ok(handle)
394 }
395}