1use crate::{
4 common::{Attached, LaunchContextWith, WithConfigs},
5 hooks::NodeHooks,
6 rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7 setup::build_networked_pipeline,
8 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_db_api::{database_metrics::DatabaseMetrics, Database};
15use reth_engine_service::service::{ChainEvent, EngineService};
16use reth_engine_tree::{
17 engine::{EngineApiRequest, EngineRequestHandler},
18 tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28 dirs::{ChainPath, DataDirPath},
29 exit::NodeExitFuture,
30 primitives::Head,
31};
32use reth_node_events::node;
33use reth_provider::{
34 providers::{BlockchainProvider, NodeTypesForProvider},
35 BlockNumReader,
36};
37use reth_tasks::TaskExecutor;
38use reth_tokio_util::EventSender;
39use reth_tracing::tracing::{debug, error, info};
40use std::sync::Arc;
41use tokio::sync::{mpsc::unbounded_channel, oneshot};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43
44#[derive(Debug)]
46pub struct EngineNodeLauncher {
47 pub ctx: LaunchContext,
49
50 pub engine_tree_config: TreeConfig,
53}
54
55impl EngineNodeLauncher {
56 pub const fn new(
58 task_executor: TaskExecutor,
59 data_dir: ChainPath<DataDirPath>,
60 engine_tree_config: TreeConfig,
61 ) -> Self {
62 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
63 }
64}
65
66impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
67where
68 Types: NodeTypesForProvider + NodeTypes,
69 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
70 T: FullNodeTypes<
71 Types = Types,
72 DB = DB,
73 Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
74 >,
75 CB: NodeComponentsBuilder<T>,
76 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
77 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
78{
79 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
80
81 async fn launch_node(
82 self,
83 target: NodeBuilderWithComponents<T, CB, AO>,
84 ) -> eyre::Result<Self::Node> {
85 let Self { ctx, engine_tree_config } = self;
86 let NodeBuilderWithComponents {
87 adapter: NodeTypesAdapter { database },
88 components_builder,
89 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
90 config,
91 } = target;
92 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
93
94 let ctx = ctx
96 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
97 .with_loaded_toml_config(config)?
99 .with_resolved_peers()?
101 .attach(database.clone())
103 .with_adjusted_configs()
105 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
107 .inspect(|_| {
108 info!(target: "reth::cli", "Database opened");
109 })
110 .with_prometheus_server().await?
111 .inspect(|this| {
112 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
113 })
114 .with_genesis()?
115 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
116 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
117 })
118 .with_metrics_task()
119 .with_blockchain_db::<T, _>(move |provider_factory| {
122 Ok(BlockchainProvider::new(provider_factory)?)
123 })?
124 .with_components(components_builder, on_component_initialized).await?;
125
126 ctx.expire_pre_merge_transactions()?;
128
129 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
131
132 let network_handle = ctx.components().network().clone();
134 let network_client = network_handle.fetch_client().await?;
135 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
136
137 let node_config = ctx.node_config();
138
139 network_handle.update_sync_state(SyncState::Syncing);
141
142 let max_block = ctx.max_block(network_client.clone()).await?;
143
144 let static_file_producer = ctx.static_file_producer();
145 let static_file_producer_events = static_file_producer.lock().events();
146 info!(target: "reth::cli", "StaticFileProducer initialized");
147
148 let consensus = Arc::new(ctx.components().consensus().clone());
149
150 let pipeline = build_networked_pipeline(
151 &ctx.toml_config().stages,
152 network_client.clone(),
153 consensus.clone(),
154 ctx.provider_factory().clone(),
155 ctx.task_executor(),
156 ctx.sync_metrics_tx(),
157 ctx.prune_config(),
158 max_block,
159 static_file_producer,
160 ctx.components().evm_config().clone(),
161 maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
162 ctx.era_import_source(),
163 )?;
164
165 pipeline.move_to_static_files()?;
167
168 let pipeline_events = pipeline.events();
169
170 let mut pruner_builder = ctx.pruner_builder();
171 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
172 pruner_builder =
173 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
174 }
175 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
176 let pruner_events = pruner.events();
177 info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
178
179 let event_sender = EventSender::default();
180
181 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
182
183 let jwt_secret = ctx.auth_jwt_secret()?;
185
186 let add_ons_ctx = AddOnsContext {
187 node: ctx.node_adapter().clone(),
188 config: ctx.node_config(),
189 beacon_engine_handle: beacon_engine_handle.clone(),
190 jwt_secret,
191 engine_events: event_sender.clone(),
192 };
193 let validator_builder = add_ons.engine_validator_builder();
194
195 let engine_validator = validator_builder
197 .clone()
198 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
199 .await?;
200
201 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
203 .maybe_skip_fcu(node_config.debug.skip_fcu)
204 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
205 .maybe_reorg(
206 ctx.blockchain_db().clone(),
207 ctx.components().evm_config().clone(),
208 || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
209 node_config.debug.reorg_frequency,
210 node_config.debug.reorg_depth,
211 )
212 .await?
213 .maybe_store_messages(node_config.debug.engine_api_store.clone());
217
218 let mut engine_service = EngineService::new(
219 consensus.clone(),
220 ctx.chain_spec(),
221 network_client.clone(),
222 Box::pin(consensus_engine_stream),
223 pipeline,
224 Box::new(ctx.task_executor().clone()),
225 ctx.provider_factory().clone(),
226 ctx.blockchain_db().clone(),
227 pruner,
228 ctx.components().payload_builder_handle().clone(),
229 engine_validator,
230 engine_tree_config,
231 ctx.sync_metrics_tx(),
232 ctx.components().evm_config().clone(),
233 );
234
235 info!(target: "reth::cli", "Consensus engine initialized");
236
237 let events = stream_select!(
238 event_sender.new_listener().map(Into::into),
239 pipeline_events.map(Into::into),
240 ctx.consensus_layer_events(),
241 pruner_events.map(Into::into),
242 static_file_producer_events.map(Into::into),
243 );
244
245 ctx.task_executor().spawn_critical(
246 "events task",
247 Box::pin(node::handle_events(
248 Some(Box::new(ctx.components().network().clone())),
249 Some(ctx.head().number),
250 events,
251 )),
252 );
253
254 let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
255 add_ons.launch_add_ons(add_ons_ctx).await?;
256
257 let initial_target = ctx.initial_backfill_target()?;
259 let mut built_payloads = ctx
260 .components()
261 .payload_builder_handle()
262 .subscribe()
263 .await
264 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
265 .into_built_payload_stream()
266 .fuse();
267
268 let chainspec = ctx.chain_spec();
269 let provider = ctx.blockchain_db().clone();
270 let (exit, rx) = oneshot::channel();
271 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
272
273 info!(target: "reth::cli", "Starting consensus engine");
274 ctx.task_executor().spawn_critical("consensus engine", Box::pin(async move {
275 if let Some(initial_target) = initial_target {
276 debug!(target: "reth::cli", %initial_target, "start backfill sync");
277 engine_service.orchestrator_mut().start_backfill_sync(initial_target);
278 }
279
280 let mut res = Ok(());
281
282 loop {
284 tokio::select! {
285 payload = built_payloads.select_next_some() => {
286 if let Some(executed_block) = payload.executed_block() {
287 debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(), "inserting built payload");
288 engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
289 }
290 }
291 event = engine_service.next() => {
292 let Some(event) = event else { break };
293 debug!(target: "reth::cli", "Event: {event}");
294 match event {
295 ChainEvent::BackfillSyncFinished => {
296 if terminate_after_backfill {
297 debug!(target: "reth::cli", "Terminating after initial backfill");
298 break
299 }
300 }
301 ChainEvent::BackfillSyncStarted => {
302 network_handle.update_sync_state(SyncState::Syncing);
303 }
304 ChainEvent::FatalError => {
305 error!(target: "reth::cli", "Fatal error in consensus engine");
306 res = Err(eyre::eyre!("Fatal error in consensus engine"));
307 break
308 }
309 ChainEvent::Handler(ev) => {
310 if let Some(head) = ev.canonical_header() {
311 network_handle.update_sync_state(SyncState::Idle);
313 let head_block = Head {
314 number: head.number(),
315 hash: head.hash(),
316 difficulty: head.difficulty(),
317 timestamp: head.timestamp(),
318 total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
319 };
320 network_handle.update_status(head_block);
321
322 let updated = BlockRangeUpdate {
323 earliest: provider.earliest_block_number().unwrap_or_default(),
324 latest:head.number(),
325 latest_hash:head.hash()
326 };
327 network_handle.update_block_range(updated);
328 }
329 event_sender.notify(ev);
330 }
331 }
332 }
333 }
334 }
335
336 let _ = exit.send(res);
337 }));
338
339 let full_node = FullNode {
340 evm_config: ctx.components().evm_config().clone(),
341 pool: ctx.components().pool().clone(),
342 network: ctx.components().network().clone(),
343 provider: ctx.node_adapter().provider.clone(),
344 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
345 task_executor: ctx.task_executor().clone(),
346 config: ctx.node_config().clone(),
347 data_dir: ctx.data_dir().clone(),
348 add_ons_handle: RpcHandle {
349 rpc_server_handles,
350 rpc_registry,
351 engine_events,
352 beacon_engine_handle,
353 },
354 };
355 on_node_started.on_event(FullNode::clone(&full_node))?;
357
358 ctx.spawn_ethstats().await?;
359
360 let handle = NodeHandle {
361 node_exit_future: NodeExitFuture::new(
362 async { rx.await? },
363 full_node.config.debug.terminate,
364 ),
365 node: full_node,
366 };
367
368 Ok(handle)
369 }
370}