1use alloy_consensus::BlockHeader;
4use futures::{future::Either, stream, stream_select, StreamExt};
5use reth_chainspec::EthChainSpec;
6use reth_db_api::{database_metrics::DatabaseMetrics, Database};
7use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
8use reth_engine_service::service::{ChainEvent, EngineService};
9use reth_engine_tree::{
10 engine::{EngineApiRequest, EngineRequestHandler},
11 tree::TreeConfig,
12};
13use reth_engine_util::EngineMessageStreamExt;
14use reth_exex::ExExManagerHandle;
15use reth_network::{NetworkSyncUpdater, SyncState};
16use reth_network_api::BlockDownloaderProvider;
17use reth_node_api::{
18 BeaconConsensusEngineHandle, BuiltPayload, FullNodeTypes, NodeTypesWithDBAdapter,
19 NodeTypesWithEngine, PayloadAttributesBuilder, PayloadTypes,
20};
21use reth_node_core::{
22 dirs::{ChainPath, DataDirPath},
23 exit::NodeExitFuture,
24 primitives::Head,
25};
26use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
27use reth_primitives::EthereumHardforks;
28use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
29use reth_tasks::TaskExecutor;
30use reth_tokio_util::EventSender;
31use reth_tracing::tracing::{debug, error, info};
32use std::sync::Arc;
33use tokio::sync::{mpsc::unbounded_channel, oneshot};
34use tokio_stream::wrappers::UnboundedReceiverStream;
35
36use crate::{
37 common::{Attached, LaunchContextWith, WithConfigs},
38 hooks::NodeHooks,
39 rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
40 setup::build_networked_pipeline,
41 AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
42 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
43};
44
45#[derive(Debug)]
47pub struct EngineNodeLauncher {
48 pub ctx: LaunchContext,
50
51 pub engine_tree_config: TreeConfig,
54}
55
56impl EngineNodeLauncher {
57 pub const fn new(
59 task_executor: TaskExecutor,
60 data_dir: ChainPath<DataDirPath>,
61 engine_tree_config: TreeConfig,
62 ) -> Self {
63 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
64 }
65}
66
67impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
68where
69 Types: NodeTypesForProvider + NodeTypesWithEngine,
70 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
71 T: FullNodeTypes<
72 Types = Types,
73 DB = DB,
74 Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
75 >,
76 CB: NodeComponentsBuilder<T>,
77 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
78 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
79 LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
80 <<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
81 >,
82{
83 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
84
85 async fn launch_node(
86 self,
87 target: NodeBuilderWithComponents<T, CB, AO>,
88 ) -> eyre::Result<Self::Node> {
89 let Self { ctx, engine_tree_config } = self;
90 let NodeBuilderWithComponents {
91 adapter: NodeTypesAdapter { database },
92 components_builder,
93 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
94 config,
95 } = target;
96 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
97
98 let ctx = ctx
100 .with_configured_globals()
101 .with_loaded_toml_config(config)?
103 .with_resolved_peers()?
105 .attach(database.clone())
107 .with_adjusted_configs()
109 .with_provider_factory().await?
111 .inspect(|_| {
112 info!(target: "reth::cli", "Database opened");
113 })
114 .with_prometheus_server().await?
115 .inspect(|this| {
116 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
117 })
118 .with_genesis()?
119 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
120 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
121 })
122 .with_metrics_task()
123 .with_blockchain_db::<T, _>(move |provider_factory| {
126 Ok(BlockchainProvider::new(provider_factory)?)
127 })?
128 .with_components(components_builder, on_component_initialized).await?;
129
130 let exex_manager_handle = ExExLauncher::new(
132 ctx.head(),
133 ctx.node_adapter().clone(),
134 installed_exex,
135 ctx.configs().clone(),
136 )
137 .launch()
138 .await?;
139
140 let network_client = ctx.components().network().fetch_client().await?;
142 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
143
144 let node_config = ctx.node_config();
145
146 let max_block = ctx.max_block(network_client.clone()).await?;
147
148 let static_file_producer = ctx.static_file_producer();
149 let static_file_producer_events = static_file_producer.lock().events();
150 info!(target: "reth::cli", "StaticFileProducer initialized");
151
152 let consensus = Arc::new(ctx.components().consensus().clone());
153
154 let pipeline_exex_handle =
156 exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
157 let pipeline = build_networked_pipeline(
158 &ctx.toml_config().stages,
159 network_client.clone(),
160 consensus.clone(),
161 ctx.provider_factory().clone(),
162 ctx.task_executor(),
163 ctx.sync_metrics_tx(),
164 ctx.prune_config(),
165 max_block,
166 static_file_producer,
167 ctx.components().block_executor().clone(),
168 pipeline_exex_handle,
169 )?;
170
171 pipeline.move_to_static_files()?;
173
174 let pipeline_events = pipeline.events();
175
176 let mut pruner_builder = ctx.pruner_builder();
177 if let Some(exex_manager_handle) = &exex_manager_handle {
178 pruner_builder =
179 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
180 }
181 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
182 let pruner_events = pruner.events();
183 info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
184
185 let event_sender = EventSender::default();
186
187 let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
188
189 let jwt_secret = ctx.auth_jwt_secret()?;
191
192 let add_ons_ctx = AddOnsContext {
193 node: ctx.node_adapter().clone(),
194 config: ctx.node_config(),
195 beacon_engine_handle: beacon_engine_handle.clone(),
196 jwt_secret,
197 engine_events: event_sender.clone(),
198 };
199 let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
200
201 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
202 .maybe_skip_fcu(node_config.debug.skip_fcu)
203 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
204 .maybe_reorg(
205 ctx.blockchain_db().clone(),
206 ctx.components().evm_config().clone(),
207 engine_payload_validator.clone(),
208 node_config.debug.reorg_frequency,
209 node_config.debug.reorg_depth,
210 )
211 .maybe_store_messages(node_config.debug.engine_api_store.clone());
215
216 let mut engine_service = if ctx.is_dev() {
217 let eth_service = LocalEngineService::new(
218 consensus.clone(),
219 ctx.components().block_executor().clone(),
220 ctx.provider_factory().clone(),
221 ctx.blockchain_db().clone(),
222 pruner,
223 ctx.components().payload_builder_handle().clone(),
224 engine_payload_validator,
225 engine_tree_config,
226 ctx.invalid_block_hook()?,
227 ctx.sync_metrics_tx(),
228 consensus_engine_tx.clone(),
229 Box::pin(consensus_engine_stream),
230 ctx.dev_mining_mode(ctx.components().pool()),
231 LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
232 ctx.components().evm_config().clone(),
233 );
234
235 Either::Left(eth_service)
236 } else {
237 let eth_service = EngineService::new(
238 consensus.clone(),
239 ctx.components().block_executor().clone(),
240 ctx.chain_spec(),
241 network_client.clone(),
242 Box::pin(consensus_engine_stream),
243 pipeline,
244 Box::new(ctx.task_executor().clone()),
245 ctx.provider_factory().clone(),
246 ctx.blockchain_db().clone(),
247 pruner,
248 ctx.components().payload_builder_handle().clone(),
249 engine_payload_validator,
250 engine_tree_config,
251 ctx.invalid_block_hook()?,
252 ctx.sync_metrics_tx(),
253 ctx.components().evm_config().clone(),
254 );
255
256 Either::Right(eth_service)
257 };
258
259 info!(target: "reth::cli", "Consensus engine initialized");
260
261 let events = stream_select!(
262 event_sender.new_listener().map(Into::into),
263 pipeline_events.map(Into::into),
264 if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
265 Either::Left(
266 ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
267 .map(Into::into),
268 )
269 } else {
270 Either::Right(stream::empty())
271 },
272 pruner_events.map(Into::into),
273 static_file_producer_events.map(Into::into),
274 );
275
276 ctx.task_executor().spawn_critical(
277 "events task",
278 node::handle_events(
279 Some(Box::new(ctx.components().network().clone())),
280 Some(ctx.head().number),
281 events,
282 ),
283 );
284
285 let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
286 add_ons.launch_add_ons(add_ons_ctx).await?;
287
288 let initial_target = ctx.initial_backfill_target()?;
290 let network_handle = ctx.components().network().clone();
291 let mut built_payloads = ctx
292 .components()
293 .payload_builder_handle()
294 .subscribe()
295 .await
296 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
297 .into_built_payload_stream()
298 .fuse();
299 let chainspec = ctx.chain_spec();
300 let (exit, rx) = oneshot::channel();
301 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
302
303 info!(target: "reth::cli", "Starting consensus engine");
304 ctx.task_executor().spawn_critical("consensus engine", async move {
305 if let Some(initial_target) = initial_target {
306 debug!(target: "reth::cli", %initial_target, "start backfill sync");
307 if let Either::Right(eth_service) = &mut engine_service {
308 eth_service.orchestrator_mut().start_backfill_sync(initial_target);
309 }
310 }
311
312 let mut res = Ok(());
313
314 loop {
316 tokio::select! {
317 payload = built_payloads.select_next_some() => {
318 if let Some(executed_block) = payload.executed_block() {
319 debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(), "inserting built payload");
320 if let Either::Right(eth_service) = &mut engine_service {
321 eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
322 }
323 }
324 }
325 event = engine_service.next() => {
326 let Some(event) = event else { break };
327 debug!(target: "reth::cli", "Event: {event}");
328 match event {
329 ChainEvent::BackfillSyncFinished => {
330 if terminate_after_backfill {
331 debug!(target: "reth::cli", "Terminating after initial backfill");
332 break
333 }
334
335 network_handle.update_sync_state(SyncState::Idle);
336 }
337 ChainEvent::BackfillSyncStarted => {
338 network_handle.update_sync_state(SyncState::Syncing);
339 }
340 ChainEvent::FatalError => {
341 error!(target: "reth::cli", "Fatal error in consensus engine");
342 res = Err(eyre::eyre!("Fatal error in consensus engine"));
343 break
344 }
345 ChainEvent::Handler(ev) => {
346 if let Some(head) = ev.canonical_header() {
347 let head_block = Head {
348 number: head.number(),
349 hash: head.hash(),
350 difficulty: head.difficulty(),
351 timestamp: head.timestamp(),
352 total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
353 };
354 network_handle.update_status(head_block);
355 }
356 event_sender.notify(ev);
357 }
358 }
359 }
360 }
361 }
362
363 let _ = exit.send(res);
364 });
365
366 let full_node = FullNode {
367 evm_config: ctx.components().evm_config().clone(),
368 block_executor: ctx.components().block_executor().clone(),
369 pool: ctx.components().pool().clone(),
370 network: ctx.components().network().clone(),
371 provider: ctx.node_adapter().provider.clone(),
372 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
373 task_executor: ctx.task_executor().clone(),
374 config: ctx.node_config().clone(),
375 data_dir: ctx.data_dir().clone(),
376 add_ons_handle: RpcHandle {
377 rpc_server_handles,
378 rpc_registry,
379 engine_events,
380 beacon_engine_handle,
381 },
382 };
383 on_node_started.on_event(FullNode::clone(&full_node))?;
385
386 let handle = NodeHandle {
387 node_exit_future: NodeExitFuture::new(
388 async { rx.await? },
389 full_node.config.debug.terminate,
390 ),
391 node: full_node,
392 };
393
394 Ok(handle)
395 }
396}