1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_service::service::{ChainEvent, EngineService};
15use reth_engine_tree::{
16 engine::{EngineApiRequest, EngineRequestHandler},
17 tree::TreeConfig,
18};
19use reth_engine_util::EngineMessageStreamExt;
20use reth_exex::ExExManagerHandle;
21use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
22use reth_network_api::BlockDownloaderProvider;
23use reth_node_api::{
24 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
25};
26use reth_node_core::{
27 dirs::{ChainPath, DataDirPath},
28 exit::NodeExitFuture,
29 primitives::Head,
30};
31use reth_node_events::node;
32use reth_provider::{
33 providers::{BlockchainProvider, NodeTypesForProvider},
34 BlockNumReader,
35};
36use reth_tasks::TaskExecutor;
37use reth_tokio_util::EventSender;
38use reth_tracing::tracing::{debug, error, info};
39use std::{future::Future, pin::Pin, sync::Arc};
40use tokio::sync::{mpsc::unbounded_channel, oneshot};
41use tokio_stream::wrappers::UnboundedReceiverStream;
42
43#[derive(Debug)]
45pub struct EngineNodeLauncher {
46 pub ctx: LaunchContext,
48
49 pub engine_tree_config: TreeConfig,
52}
53
54impl EngineNodeLauncher {
55 pub const fn new(
57 task_executor: TaskExecutor,
58 data_dir: ChainPath<DataDirPath>,
59 engine_tree_config: TreeConfig,
60 ) -> Self {
61 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
62 }
63
64 async fn launch_node<T, CB, AO>(
65 self,
66 target: NodeBuilderWithComponents<T, CB, AO>,
67 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
68 where
69 T: FullNodeTypes<
70 Types: NodeTypesForProvider,
71 Provider = BlockchainProvider<
72 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
73 >,
74 >,
75 CB: NodeComponentsBuilder<T>,
76 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
77 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
78 {
79 let Self { ctx, engine_tree_config } = self;
80 let NodeBuilderWithComponents {
81 adapter: NodeTypesAdapter { database },
82 components_builder,
83 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
84 config,
85 } = target;
86 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
87
88 let ctx = ctx
90 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
91 .with_loaded_toml_config(config)?
93 .with_resolved_peers()?
95 .attach(database.clone())
97 .with_adjusted_configs()
99 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
101 .inspect(|_| {
102 info!(target: "reth::cli", "Database opened");
103 })
104 .with_prometheus_server().await?
105 .inspect(|this| {
106 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
107 })
108 .with_genesis()?
109 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
110 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
111 })
112 .with_metrics_task()
113 .with_blockchain_db::<T, _>(move |provider_factory| {
116 Ok(BlockchainProvider::new(provider_factory)?)
117 })?
118 .with_components(components_builder, on_component_initialized).await?;
119
120 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
122
123 let network_handle = ctx.components().network().clone();
125 let network_client = network_handle.fetch_client().await?;
126 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
127
128 let node_config = ctx.node_config();
129
130 network_handle.update_sync_state(SyncState::Syncing);
132
133 let max_block = ctx.max_block(network_client.clone()).await?;
134
135 let static_file_producer = ctx.static_file_producer();
136 let static_file_producer_events = static_file_producer.lock().events();
137 info!(target: "reth::cli", "StaticFileProducer initialized");
138
139 let consensus = Arc::new(ctx.components().consensus().clone());
140
141 let pipeline = build_networked_pipeline(
142 &ctx.toml_config().stages,
143 network_client.clone(),
144 consensus.clone(),
145 ctx.provider_factory().clone(),
146 ctx.task_executor(),
147 ctx.sync_metrics_tx(),
148 ctx.prune_config(),
149 max_block,
150 static_file_producer,
151 ctx.components().evm_config().clone(),
152 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
153 ctx.era_import_source(),
154 )?;
155
156 pipeline.move_to_static_files()?;
158
159 let pipeline_events = pipeline.events();
160
161 let mut pruner_builder = ctx.pruner_builder();
162 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
163 pruner_builder =
164 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
165 }
166 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
167 let pruner_events = pruner.events();
168 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
169
170 let event_sender = EventSender::default();
171
172 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
173
174 let jwt_secret = ctx.auth_jwt_secret()?;
176
177 let add_ons_ctx = AddOnsContext {
178 node: ctx.node_adapter().clone(),
179 config: ctx.node_config(),
180 beacon_engine_handle: beacon_engine_handle.clone(),
181 jwt_secret,
182 engine_events: event_sender.clone(),
183 };
184 let validator_builder = add_ons.engine_validator_builder();
185
186 let engine_validator = validator_builder
188 .clone()
189 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
190 .await?;
191
192 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
194 .maybe_skip_fcu(node_config.debug.skip_fcu)
195 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
196 .maybe_reorg(
197 ctx.blockchain_db().clone(),
198 ctx.components().evm_config().clone(),
199 || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
200 node_config.debug.reorg_frequency,
201 node_config.debug.reorg_depth,
202 )
203 .await?
204 .maybe_store_messages(node_config.debug.engine_api_store.clone());
208
209 let mut engine_service = EngineService::new(
210 consensus.clone(),
211 ctx.chain_spec(),
212 network_client.clone(),
213 Box::pin(consensus_engine_stream),
214 pipeline,
215 Box::new(ctx.task_executor().clone()),
216 ctx.provider_factory().clone(),
217 ctx.blockchain_db().clone(),
218 pruner,
219 ctx.components().payload_builder_handle().clone(),
220 engine_validator,
221 engine_tree_config,
222 ctx.sync_metrics_tx(),
223 ctx.components().evm_config().clone(),
224 );
225
226 info!(target: "reth::cli", "Consensus engine initialized");
227
228 #[allow(clippy::needless_continue)]
229 let events = stream_select!(
230 event_sender.new_listener().map(Into::into),
231 pipeline_events.map(Into::into),
232 ctx.consensus_layer_events(),
233 pruner_events.map(Into::into),
234 static_file_producer_events.map(Into::into),
235 );
236
237 ctx.task_executor().spawn_critical(
238 "events task",
239 Box::pin(node::handle_events(
240 Some(Box::new(ctx.components().network().clone())),
241 Some(ctx.head().number),
242 events,
243 )),
244 );
245
246 let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
247 add_ons.launch_add_ons(add_ons_ctx).await?;
248
249 let initial_target = ctx.initial_backfill_target()?;
251 let mut built_payloads = ctx
252 .components()
253 .payload_builder_handle()
254 .subscribe()
255 .await
256 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
257 .into_built_payload_stream()
258 .fuse();
259
260 let chainspec = ctx.chain_spec();
261 let provider = ctx.blockchain_db().clone();
262 let (exit, rx) = oneshot::channel();
263 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
264 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
265
266 info!(target: "reth::cli", "Starting consensus engine");
267 ctx.task_executor().spawn_critical("consensus engine", Box::pin(async move {
268 if let Some(initial_target) = initial_target {
269 debug!(target: "reth::cli", %initial_target, "start backfill sync");
270 engine_service.orchestrator_mut().start_backfill_sync(initial_target);
272 } else if startup_sync_state_idle {
273 network_handle.update_sync_state(SyncState::Idle);
274 }
275
276 let mut res = Ok(());
277
278 loop {
280 tokio::select! {
281 payload = built_payloads.select_next_some() => {
282 if let Some(executed_block) = payload.executed_block() {
283 debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(), "inserting built payload");
284 engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
285 }
286 }
287 event = engine_service.next() => {
288 let Some(event) = event else { break };
289 debug!(target: "reth::cli", "Event: {event}");
290 match event {
291 ChainEvent::BackfillSyncFinished => {
292 if terminate_after_backfill {
293 debug!(target: "reth::cli", "Terminating after initial backfill");
294 break
295 }
296 if startup_sync_state_idle {
297 network_handle.update_sync_state(SyncState::Idle);
298 }
299 }
300 ChainEvent::BackfillSyncStarted => {
301 network_handle.update_sync_state(SyncState::Syncing);
302 }
303 ChainEvent::FatalError => {
304 error!(target: "reth::cli", "Fatal error in consensus engine");
305 res = Err(eyre::eyre!("Fatal error in consensus engine"));
306 break
307 }
308 ChainEvent::Handler(ev) => {
309 if let Some(head) = ev.canonical_header() {
310 network_handle.update_sync_state(SyncState::Idle);
312 let head_block = Head {
313 number: head.number(),
314 hash: head.hash(),
315 difficulty: head.difficulty(),
316 timestamp: head.timestamp(),
317 total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
318 };
319 network_handle.update_status(head_block);
320
321 let updated = BlockRangeUpdate {
322 earliest: provider.earliest_block_number().unwrap_or_default(),
323 latest:head.number(),
324 latest_hash:head.hash()
325 };
326 network_handle.update_block_range(updated);
327 }
328 event_sender.notify(ev);
329 }
330 }
331 }
332 }
333 }
334
335 let _ = exit.send(res);
336 }));
337
338 let full_node = FullNode {
339 evm_config: ctx.components().evm_config().clone(),
340 pool: ctx.components().pool().clone(),
341 network: ctx.components().network().clone(),
342 provider: ctx.node_adapter().provider.clone(),
343 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
344 task_executor: ctx.task_executor().clone(),
345 config: ctx.node_config().clone(),
346 data_dir: ctx.data_dir().clone(),
347 add_ons_handle: RpcHandle {
348 rpc_server_handles,
349 rpc_registry,
350 engine_events,
351 beacon_engine_handle,
352 },
353 };
354 on_node_started.on_event(FullNode::clone(&full_node))?;
356
357 ctx.spawn_ethstats().await?;
358
359 let handle = NodeHandle {
360 node_exit_future: NodeExitFuture::new(
361 async { rx.await? },
362 full_node.config.debug.terminate,
363 ),
364 node: full_node,
365 };
366
367 Ok(handle)
368 }
369}
370
371impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
372where
373 T: FullNodeTypes<
374 Types: NodeTypesForProvider,
375 Provider = BlockchainProvider<
376 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
377 >,
378 >,
379 CB: NodeComponentsBuilder<T> + 'static,
380 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
381 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
382 + 'static,
383{
384 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
385 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
386
387 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
388 Box::pin(self.launch_node(target))
389 }
390}