Skip to main content

reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_tree::{
15    chain::{ChainEvent, FromOrchestrator},
16    engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
17    launch::build_engine_orchestrator,
18    tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28    args::PruneConfigKind,
29    dirs::{ChainPath, DataDirPath},
30    exit::NodeExitFuture,
31    primitives::Head,
32};
33use reth_node_events::node;
34use reth_provider::{
35    providers::{BlockchainProvider, NodeTypesForProvider},
36    BlockNumReader, StorageSettingsCache,
37};
38use reth_tasks::TaskExecutor;
39use reth_tokio_util::EventSender;
40use reth_tracing::tracing::{debug, error, info};
41use reth_trie_db::ChangesetCache;
42use std::{future::Future, pin::Pin, sync::Arc};
43use tokio::sync::{mpsc::unbounded_channel, oneshot};
44use tokio_stream::wrappers::UnboundedReceiverStream;
45
46/// The engine node launcher.
47#[derive(Debug)]
48pub struct EngineNodeLauncher {
49    /// The task executor for the node.
50    pub ctx: LaunchContext,
51
52    /// Temporary configuration for engine tree.
53    /// After engine is stabilized, this should be configured through node builder.
54    pub engine_tree_config: TreeConfig,
55}
56
57impl EngineNodeLauncher {
58    /// Create a new instance of the ethereum node launcher.
59    pub const fn new(
60        task_executor: TaskExecutor,
61        data_dir: ChainPath<DataDirPath>,
62        engine_tree_config: TreeConfig,
63    ) -> Self {
64        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
65    }
66
67    async fn launch_node<T, CB, AO>(
68        self,
69        target: NodeBuilderWithComponents<T, CB, AO>,
70    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
71    where
72        T: FullNodeTypes<
73            Types: NodeTypesForProvider,
74            Provider = BlockchainProvider<
75                NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
76            >,
77        >,
78        CB: NodeComponentsBuilder<T>,
79        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
80            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
81    {
82        let Self { ctx, engine_tree_config } = self;
83        let NodeBuilderWithComponents {
84            adapter: NodeTypesAdapter { database },
85            rocksdb_provider,
86            components_builder,
87            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
88            config,
89        } = target;
90        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
91
92        // Create changeset cache that will be shared across the engine
93        let changeset_cache = ChangesetCache::new();
94
95        // setup the launch context
96        let ctx = ctx
97            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
98            // load the toml config
99            .with_loaded_toml_config(config)?
100            // add resolved peers
101            .with_resolved_peers()?
102            // attach the database
103            .attach(database.clone())
104            // ensure certain settings take effect
105            .with_adjusted_configs()
106            // Create the provider factory with changeset cache
107            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
108            .inspect(|_| {
109                info!(target: "reth::cli", "Database opened");
110            })
111            .with_prometheus_server().await?
112            .inspect(|this| {
113                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
114            })
115            .with_genesis()?
116            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
117                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
118                let settings = this.provider_factory().cached_storage_settings();
119                let pruning_mode =
120                    PruneConfigKind::from_config(&this.prune_config(), this.chain_spec().as_ref()).as_str();
121                info!(target: "reth::cli", ?settings, ?pruning_mode, "Loaded storage settings");
122            })
123            .with_metrics_task()
124            // passing FullNodeTypes as type parameter here so that we can build
125            // later the components.
126            .with_blockchain_db::<T, _>(move |provider_factory| {
127                Ok(BlockchainProvider::new(provider_factory)?)
128            })?
129            .with_components(components_builder, on_component_initialized).await?;
130
131        // spawn exexs if any
132        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
133
134        // create pipeline
135        let network_handle = ctx.components().network().clone();
136        let network_client = network_handle.fetch_client().await?;
137        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
138
139        let node_config = ctx.node_config();
140
141        // We always assume that node is syncing after a restart
142        network_handle.update_sync_state(SyncState::Syncing);
143
144        let max_block = ctx.max_block(network_client.clone()).await?;
145
146        let static_file_producer = ctx.static_file_producer();
147        let static_file_producer_events = static_file_producer.lock().events();
148        info!(target: "reth::cli", "StaticFileProducer initialized");
149
150        let consensus = Arc::new(ctx.components().consensus().clone());
151
152        let pipeline = build_networked_pipeline(
153            &ctx.toml_config().stages,
154            network_client.clone(),
155            consensus.clone(),
156            ctx.provider_factory().clone(),
157            ctx.task_executor(),
158            ctx.sync_metrics_tx(),
159            ctx.prune_config(),
160            max_block,
161            static_file_producer,
162            ctx.components().evm_config().clone(),
163            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
164            ctx.era_import_source(),
165        )?;
166
167        // The new engine writes directly to static files. This ensures that they're up to the tip.
168        pipeline.move_to_static_files()?;
169
170        let pipeline_events = pipeline.events();
171
172        let mut pruner_builder = ctx.pruner_builder();
173        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
174            pruner_builder =
175                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
176        }
177        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
178        let pruner_events = pruner.events();
179        info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
180
181        let event_sender = EventSender::default();
182
183        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
184
185        // extract the jwt secret from the args if possible
186        let jwt_secret = ctx.auth_jwt_secret()?;
187
188        let add_ons_ctx = AddOnsContext {
189            node: ctx.node_adapter().clone(),
190            config: ctx.node_config(),
191            beacon_engine_handle: beacon_engine_handle.clone(),
192            jwt_secret,
193            engine_events: event_sender.clone(),
194        };
195        let validator_builder = add_ons.engine_validator_builder();
196
197        // Build the engine validator with all required components
198        let engine_validator = validator_builder
199            .clone()
200            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
201            .await?;
202
203        // Create the consensus engine stream with optional reorg
204        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
205            .maybe_skip_fcu(node_config.debug.skip_fcu)
206            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
207            .maybe_reorg(
208                ctx.blockchain_db().clone(),
209                ctx.components().evm_config().clone(),
210                || async {
211                    validator_builder
212                        .build_tree_validator(
213                            &add_ons_ctx,
214                            engine_tree_config.clone(),
215                            changeset_cache.clone(),
216                        )
217                        .await
218                },
219                node_config.debug.reorg_frequency,
220                node_config.debug.reorg_depth,
221            )
222            .await?
223            // Store messages _after_ skipping so that `replay-engine` command
224            // would replay only the messages that were observed by the engine
225            // during this run.
226            .maybe_store_messages(node_config.debug.engine_api_store.clone());
227
228        let engine_kind = if ctx.chain_spec().is_optimism() {
229            EngineApiKind::OpStack
230        } else {
231            EngineApiKind::Ethereum
232        };
233
234        let mut orchestrator = build_engine_orchestrator(
235            engine_kind,
236            consensus.clone(),
237            network_client.clone(),
238            Box::pin(consensus_engine_stream),
239            pipeline,
240            ctx.task_executor().clone(),
241            ctx.provider_factory().clone(),
242            ctx.blockchain_db().clone(),
243            pruner,
244            ctx.components().payload_builder_handle().clone(),
245            engine_validator,
246            engine_tree_config,
247            ctx.sync_metrics_tx(),
248            ctx.components().evm_config().clone(),
249            changeset_cache,
250            ctx.task_executor().clone(),
251        );
252
253        info!(target: "reth::cli", "Consensus engine initialized");
254
255        #[expect(clippy::needless_continue)]
256        let events = stream_select!(
257            event_sender.new_listener().map(Into::into),
258            pipeline_events.map(Into::into),
259            ctx.consensus_layer_events(),
260            pruner_events.map(Into::into),
261            static_file_producer_events.map(Into::into),
262        );
263
264        ctx.task_executor().spawn_critical_task(
265            "events task",
266            node::handle_events(
267                Some(Box::new(ctx.components().network().clone())),
268                Some(ctx.head().number),
269                events,
270            ),
271        );
272
273        let RpcHandle {
274            rpc_server_handles,
275            rpc_registry,
276            engine_events,
277            beacon_engine_handle,
278            engine_shutdown: _,
279        } = add_ons.launch_add_ons(add_ons_ctx).await?;
280
281        // Create engine shutdown handle
282        let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
283
284        // Run consensus engine to completion
285        let initial_target = ctx.initial_backfill_target()?;
286        let mut built_payloads = ctx
287            .components()
288            .payload_builder_handle()
289            .subscribe()
290            .await
291            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
292            .into_built_payload_stream()
293            .fuse();
294
295        let chainspec = ctx.chain_spec();
296        let provider = ctx.blockchain_db().clone();
297        let (exit, rx) = oneshot::channel();
298        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
299        let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
300
301        info!(target: "reth::cli", "Starting consensus engine");
302        let consensus_engine = move |mut on_graceful_shutdown| async move {
303            if let Some(initial_target) = initial_target {
304                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
305                // network_handle's sync state is already initialized at Syncing
306                orchestrator.start_backfill_sync(initial_target);
307            } else if startup_sync_state_idle {
308                network_handle.update_sync_state(SyncState::Idle);
309            }
310
311            let mut res = Ok(());
312            let mut shutdown_rx = shutdown_rx.fuse();
313
314            // advance the chain and await payloads built locally to add into the engine api
315            // tree handler to prevent re-execution if that block is received as payload from
316            // the CL
317            loop {
318                tokio::select! {
319                    event = orchestrator.next() => {
320                        let Some(event) = event else { break };
321                        debug!(target: "reth::cli", "Event: {event}");
322                        match event {
323                            ChainEvent::BackfillSyncFinished => {
324                                if terminate_after_backfill {
325                                    debug!(target: "reth::cli", "Terminating after initial backfill");
326                                    break
327                                }
328                                if startup_sync_state_idle {
329                                    network_handle.update_sync_state(SyncState::Idle);
330                                }
331                            }
332                            ChainEvent::BackfillSyncStarted => {
333                                network_handle.update_sync_state(SyncState::Syncing);
334                            }
335                            ChainEvent::FatalError => {
336                                error!(target: "reth::cli", "Fatal error in consensus engine");
337                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
338                                break
339                            }
340                            ChainEvent::Handler(ev) => {
341                                if let Some(head) = ev.canonical_header() {
342                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
343                                    network_handle.update_sync_state(SyncState::Idle);
344                                    let head_block = Head {
345                                        number: head.number(),
346                                        hash: head.hash(),
347                                        difficulty: head.difficulty(),
348                                        timestamp: head.timestamp(),
349                                        total_difficulty: chainspec.final_paris_total_difficulty()
350                                            .filter(|_| chainspec.is_paris_active_at_block(head.number()))
351                                            .unwrap_or_default(),
352                                    };
353                                    network_handle.update_status(head_block);
354
355                                    let updated = BlockRangeUpdate {
356                                        earliest: provider.earliest_block_number().unwrap_or_default(),
357                                        latest: head.number(),
358                                        latest_hash: head.hash(),
359                                    };
360                                    network_handle.update_block_range(updated);
361                                }
362                                event_sender.notify(ev);
363                            }
364                        }
365                    }
366                    payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
367                        if let Some(executed_block) = payload.executed_block() {
368                            debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(),  "inserting built payload");
369                            orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
370                        }
371                    }
372                    shutdown_req = &mut shutdown_rx => {
373                        if let Ok(req) = shutdown_req {
374                            debug!(target: "reth::cli", "received engine shutdown request");
375                            orchestrator.handler_mut().handler_mut().on_event(
376                                FromOrchestrator::Terminate { tx: req.done_tx }.into()
377                            );
378                        }
379                    }
380                    _guard = &mut on_graceful_shutdown => {
381                        // Shutdown signal received.
382                        // Send Terminate so the engine OS thread can exit cleanly before we
383                        // drop the orchestrator.
384                        debug!(target: "reth::cli", "shutdown signal received, terminating engine");
385                        let (done_tx, done_rx) = oneshot::channel();
386                        orchestrator.handler_mut().handler_mut().on_event(
387                            FromOrchestrator::Terminate { tx: done_tx }.into()
388                        );
389                        let _ = done_rx.await;
390                        break;
391                    }
392                }
393            }
394
395            let _ = exit.send(res);
396        };
397        ctx.task_executor()
398            .spawn_critical_with_graceful_shutdown_signal("consensus engine", consensus_engine);
399
400        let engine_events_for_ethstats = engine_events.new_listener();
401
402        let full_node = FullNode {
403            evm_config: ctx.components().evm_config().clone(),
404            pool: ctx.components().pool().clone(),
405            network: ctx.components().network().clone(),
406            provider: ctx.node_adapter().provider.clone(),
407            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
408            task_executor: ctx.task_executor().clone(),
409            config: ctx.node_config().clone(),
410            data_dir: ctx.data_dir().clone(),
411            add_ons_handle: RpcHandle {
412                rpc_server_handles,
413                rpc_registry,
414                engine_events,
415                beacon_engine_handle,
416                engine_shutdown,
417            },
418        };
419        // Notify on node started
420        on_node_started.on_event(FullNode::clone(&full_node))?;
421
422        ctx.spawn_ethstats(engine_events_for_ethstats).await?;
423
424        let handle = NodeHandle {
425            node_exit_future: NodeExitFuture::new(async { rx.await? }),
426            node: full_node,
427        };
428
429        Ok(handle)
430    }
431}
432
433impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
434where
435    T: FullNodeTypes<
436        Types: NodeTypesForProvider,
437        Provider = BlockchainProvider<
438            NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
439        >,
440    >,
441    CB: NodeComponentsBuilder<T> + 'static,
442    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
443        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
444        + 'static,
445{
446    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
447    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
448
449    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
450        Box::pin(self.launch_node(target))
451    }
452}