1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_service::service::{ChainEvent, EngineService};
15use reth_engine_tree::{
16 engine::{EngineApiRequest, EngineRequestHandler},
17 tree::TreeConfig,
18};
19use reth_engine_util::EngineMessageStreamExt;
20use reth_exex::ExExManagerHandle;
21use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
22use reth_network_api::BlockDownloaderProvider;
23use reth_node_api::{
24 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
25};
26use reth_node_core::{
27 dirs::{ChainPath, DataDirPath},
28 exit::NodeExitFuture,
29 primitives::Head,
30};
31use reth_node_events::node;
32use reth_provider::{
33 providers::{BlockchainProvider, NodeTypesForProvider},
34 BlockNumReader, MetadataProvider,
35};
36use reth_tasks::TaskExecutor;
37use reth_tokio_util::EventSender;
38use reth_tracing::tracing::{debug, error, info};
39use std::{future::Future, pin::Pin, sync::Arc};
40use tokio::sync::{mpsc::unbounded_channel, oneshot};
41use tokio_stream::wrappers::UnboundedReceiverStream;
42use tracing::warn;
43
44#[derive(Debug)]
46pub struct EngineNodeLauncher {
47 pub ctx: LaunchContext,
49
50 pub engine_tree_config: TreeConfig,
53}
54
55impl EngineNodeLauncher {
56 pub const fn new(
58 task_executor: TaskExecutor,
59 data_dir: ChainPath<DataDirPath>,
60 engine_tree_config: TreeConfig,
61 ) -> Self {
62 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
63 }
64
65 async fn launch_node<T, CB, AO>(
66 self,
67 target: NodeBuilderWithComponents<T, CB, AO>,
68 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
69 where
70 T: FullNodeTypes<
71 Types: NodeTypesForProvider,
72 Provider = BlockchainProvider<
73 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
74 >,
75 >,
76 CB: NodeComponentsBuilder<T>,
77 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
78 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
79 {
80 let Self { ctx, engine_tree_config } = self;
81 let NodeBuilderWithComponents {
82 adapter: NodeTypesAdapter { database },
83 components_builder,
84 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
85 config,
86 } = target;
87 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
88
89 let ctx = ctx
91 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
92 .with_loaded_toml_config(config)?
94 .with_resolved_peers()?
96 .attach(database.clone())
98 .with_adjusted_configs()
100 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
102 .inspect(|ctx| {
103 info!(target: "reth::cli", "Database opened");
104 match ctx.provider_factory().storage_settings() {
105 Ok(settings) => {
106 info!(
107 target: "reth::cli",
108 ?settings,
109 "Storage settings"
110 );
111 },
112 Err(err) => {
113 warn!(
114 target: "reth::cli",
115 ?err,
116 "Failed to get storage settings"
117 );
118 },
119 }
120 })
121 .with_prometheus_server().await?
122 .inspect(|this| {
123 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
124 })
125 .with_genesis()?
126 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
127 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
128 })
129 .with_metrics_task()
130 .with_blockchain_db::<T, _>(move |provider_factory| {
133 Ok(BlockchainProvider::new(provider_factory)?)
134 })?
135 .with_components(components_builder, on_component_initialized).await?;
136
137 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
139
140 let network_handle = ctx.components().network().clone();
142 let network_client = network_handle.fetch_client().await?;
143 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
144
145 let node_config = ctx.node_config();
146
147 network_handle.update_sync_state(SyncState::Syncing);
149
150 let max_block = ctx.max_block(network_client.clone()).await?;
151
152 let static_file_producer = ctx.static_file_producer();
153 let static_file_producer_events = static_file_producer.lock().events();
154 info!(target: "reth::cli", "StaticFileProducer initialized");
155
156 let consensus = Arc::new(ctx.components().consensus().clone());
157
158 let pipeline = build_networked_pipeline(
159 &ctx.toml_config().stages,
160 network_client.clone(),
161 consensus.clone(),
162 ctx.provider_factory().clone(),
163 ctx.task_executor(),
164 ctx.sync_metrics_tx(),
165 ctx.prune_config(),
166 max_block,
167 static_file_producer,
168 ctx.components().evm_config().clone(),
169 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
170 ctx.era_import_source(),
171 )?;
172
173 pipeline.move_to_static_files()?;
175
176 let pipeline_events = pipeline.events();
177
178 let mut pruner_builder = ctx.pruner_builder();
179 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
180 pruner_builder =
181 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
182 }
183 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
184 let pruner_events = pruner.events();
185 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
186
187 let event_sender = EventSender::default();
188
189 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
190
191 let jwt_secret = ctx.auth_jwt_secret()?;
193
194 let add_ons_ctx = AddOnsContext {
195 node: ctx.node_adapter().clone(),
196 config: ctx.node_config(),
197 beacon_engine_handle: beacon_engine_handle.clone(),
198 jwt_secret,
199 engine_events: event_sender.clone(),
200 };
201 let validator_builder = add_ons.engine_validator_builder();
202
203 let engine_validator = validator_builder
205 .clone()
206 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
207 .await?;
208
209 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
211 .maybe_skip_fcu(node_config.debug.skip_fcu)
212 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
213 .maybe_reorg(
214 ctx.blockchain_db().clone(),
215 ctx.components().evm_config().clone(),
216 || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
217 node_config.debug.reorg_frequency,
218 node_config.debug.reorg_depth,
219 )
220 .await?
221 .maybe_store_messages(node_config.debug.engine_api_store.clone());
225
226 let mut engine_service = EngineService::new(
227 consensus.clone(),
228 ctx.chain_spec(),
229 network_client.clone(),
230 Box::pin(consensus_engine_stream),
231 pipeline,
232 Box::new(ctx.task_executor().clone()),
233 ctx.provider_factory().clone(),
234 ctx.blockchain_db().clone(),
235 pruner,
236 ctx.components().payload_builder_handle().clone(),
237 engine_validator,
238 engine_tree_config,
239 ctx.sync_metrics_tx(),
240 ctx.components().evm_config().clone(),
241 );
242
243 info!(target: "reth::cli", "Consensus engine initialized");
244
245 #[allow(clippy::needless_continue)]
246 let events = stream_select!(
247 event_sender.new_listener().map(Into::into),
248 pipeline_events.map(Into::into),
249 ctx.consensus_layer_events(),
250 pruner_events.map(Into::into),
251 static_file_producer_events.map(Into::into),
252 );
253
254 ctx.task_executor().spawn_critical(
255 "events task",
256 Box::pin(node::handle_events(
257 Some(Box::new(ctx.components().network().clone())),
258 Some(ctx.head().number),
259 events,
260 )),
261 );
262
263 let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
264 add_ons.launch_add_ons(add_ons_ctx).await?;
265
266 let initial_target = ctx.initial_backfill_target()?;
268 let mut built_payloads = ctx
269 .components()
270 .payload_builder_handle()
271 .subscribe()
272 .await
273 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
274 .into_built_payload_stream()
275 .fuse();
276
277 let chainspec = ctx.chain_spec();
278 let provider = ctx.blockchain_db().clone();
279 let (exit, rx) = oneshot::channel();
280 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
281 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
282
283 info!(target: "reth::cli", "Starting consensus engine");
284 ctx.task_executor().spawn_critical("consensus engine", Box::pin(async move {
285 if let Some(initial_target) = initial_target {
286 debug!(target: "reth::cli", %initial_target, "start backfill sync");
287 engine_service.orchestrator_mut().start_backfill_sync(initial_target);
289 } else if startup_sync_state_idle {
290 network_handle.update_sync_state(SyncState::Idle);
291 }
292
293 let mut res = Ok(());
294
295 loop {
297 tokio::select! {
298 payload = built_payloads.select_next_some() => {
299 if let Some(executed_block) = payload.executed_block() {
300 debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
301 engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
302 }
303 }
304 event = engine_service.next() => {
305 let Some(event) = event else { break };
306 debug!(target: "reth::cli", "Event: {event}");
307 match event {
308 ChainEvent::BackfillSyncFinished => {
309 if terminate_after_backfill {
310 debug!(target: "reth::cli", "Terminating after initial backfill");
311 break
312 }
313 if startup_sync_state_idle {
314 network_handle.update_sync_state(SyncState::Idle);
315 }
316 }
317 ChainEvent::BackfillSyncStarted => {
318 network_handle.update_sync_state(SyncState::Syncing);
319 }
320 ChainEvent::FatalError => {
321 error!(target: "reth::cli", "Fatal error in consensus engine");
322 res = Err(eyre::eyre!("Fatal error in consensus engine"));
323 break
324 }
325 ChainEvent::Handler(ev) => {
326 if let Some(head) = ev.canonical_header() {
327 network_handle.update_sync_state(SyncState::Idle);
329 let head_block = Head {
330 number: head.number(),
331 hash: head.hash(),
332 difficulty: head.difficulty(),
333 timestamp: head.timestamp(),
334 total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
335 };
336 network_handle.update_status(head_block);
337
338 let updated = BlockRangeUpdate {
339 earliest: provider.earliest_block_number().unwrap_or_default(),
340 latest:head.number(),
341 latest_hash:head.hash()
342 };
343 network_handle.update_block_range(updated);
344 }
345 event_sender.notify(ev);
346 }
347 }
348 }
349 }
350 }
351
352 let _ = exit.send(res);
353 }));
354
355 let full_node = FullNode {
356 evm_config: ctx.components().evm_config().clone(),
357 pool: ctx.components().pool().clone(),
358 network: ctx.components().network().clone(),
359 provider: ctx.node_adapter().provider.clone(),
360 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
361 task_executor: ctx.task_executor().clone(),
362 config: ctx.node_config().clone(),
363 data_dir: ctx.data_dir().clone(),
364 add_ons_handle: RpcHandle {
365 rpc_server_handles,
366 rpc_registry,
367 engine_events,
368 beacon_engine_handle,
369 },
370 };
371 on_node_started.on_event(FullNode::clone(&full_node))?;
373
374 ctx.spawn_ethstats().await?;
375
376 let handle = NodeHandle {
377 node_exit_future: NodeExitFuture::new(
378 async { rx.await? },
379 full_node.config.debug.terminate,
380 ),
381 node: full_node,
382 };
383
384 Ok(handle)
385 }
386}
387
388impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
389where
390 T: FullNodeTypes<
391 Types: NodeTypesForProvider,
392 Provider = BlockchainProvider<
393 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
394 >,
395 >,
396 CB: NodeComponentsBuilder<T> + 'static,
397 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
398 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
399 + 'static,
400{
401 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
402 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
403
404 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
405 Box::pin(self.launch_node(target))
406 }
407}