Skip to main content

reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, Node, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10    RethFullAdapter,
11};
12use alloy_consensus::BlockHeader;
13use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
14use reth_chainspec::{EthChainSpec, EthereumHardforks};
15use reth_db::{database_metrics::DatabaseMetrics, Database};
16use reth_engine_tree::{
17    chain::{ChainEvent, FromOrchestrator},
18    engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
19    launch::build_engine_orchestrator,
20    tree::TreeConfig,
21};
22use reth_engine_util::EngineMessageStreamExt;
23use reth_exex::ExExManagerHandle;
24use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
25use reth_network_api::BlockDownloaderProvider;
26use reth_node_api::{
27    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
28};
29use reth_node_core::{
30    args::PruneConfigKind,
31    dirs::{ChainPath, DataDirPath},
32    exit::NodeExitFuture,
33    primitives::Head,
34};
35use reth_node_events::node;
36use reth_provider::{
37    providers::{BlockchainProvider, NodeTypesForProvider},
38    BlockNumReader, StorageSettingsCache,
39};
40use reth_tasks::TaskExecutor;
41use reth_tokio_util::EventSender;
42use reth_tracing::tracing::{debug, error, info};
43use reth_trie_db::ChangesetCache;
44use std::{future::Future, pin::Pin, sync::Arc};
45use tokio::sync::{mpsc::unbounded_channel, oneshot};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47
48/// The engine node launcher.
49#[derive(Debug)]
50pub struct EngineNodeLauncher {
51    /// The task executor for the node.
52    pub ctx: LaunchContext,
53
54    /// Temporary configuration for engine tree.
55    /// After engine is stabilized, this should be configured through node builder.
56    pub engine_tree_config: TreeConfig,
57}
58
59impl EngineNodeLauncher {
60    /// Create a new instance of the ethereum node launcher.
61    pub const fn new(
62        task_executor: TaskExecutor,
63        data_dir: ChainPath<DataDirPath>,
64        engine_tree_config: TreeConfig,
65    ) -> Self {
66        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
67    }
68
69    async fn launch_node<N, DB, T, CB, AO>(
70        self,
71        target: NodeBuilderWithComponents<T, CB, AO>,
72    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
73    where
74        N: Node<RethFullAdapter<DB, N>> + NodeTypesForProvider,
75        DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
76        T: FullNodeTypes<
77            Types = N,
78            Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, DB>>,
79            DB = DB,
80        >,
81        CB: NodeComponentsBuilder<T>,
82        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
83            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
84    {
85        let Self { ctx, engine_tree_config } = self;
86        let NodeBuilderWithComponents {
87            adapter: NodeTypesAdapter { database },
88            rocksdb_provider,
89            components_builder,
90            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
91            config,
92        } = target;
93        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
94
95        // Create changeset cache that will be shared across the engine
96        let changeset_cache = ChangesetCache::new();
97        let disabled_stages = N::disabled_stages();
98
99        // setup the launch context
100        let ctx = ctx
101            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
102            // load the toml config
103            .with_loaded_toml_config(config)?
104            // add resolved peers
105            .with_resolved_peers()?
106            // attach the database
107            .attach(database.clone())
108            // ensure certain settings take effect
109            .with_adjusted_configs()
110            // Create the provider factory with changeset cache
111            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(
112                changeset_cache.clone(),
113                rocksdb_provider,
114                disabled_stages,
115            )
116            .await?
117            .inspect(|_| {
118                info!(target: "reth::cli", "Database opened");
119            })
120            .with_prometheus_server().await?
121            .inspect(|this| {
122                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
123            })
124            .with_genesis()?
125            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
126                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
127                let settings = this.provider_factory().cached_storage_settings();
128                let pruning_mode =
129                    PruneConfigKind::from_config(&this.prune_config(), this.chain_spec().as_ref()).as_str();
130                info!(target: "reth::cli", ?settings, ?pruning_mode, "Loaded storage settings");
131            })
132            .with_metrics_task()
133            // passing FullNodeTypes as type parameter here so that we can build
134            // later the components.
135            .with_blockchain_db::<T, _>(move |provider_factory| {
136                Ok(BlockchainProvider::new(provider_factory)?)
137            })?
138            .with_components(components_builder, on_component_initialized).await?;
139
140        // spawn exexs if any
141        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
142
143        // create pipeline
144        let network_handle = ctx.components().network().clone();
145        let network_client = network_handle.fetch_client().await?;
146        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
147
148        let node_config = ctx.node_config();
149
150        // We always assume that node is syncing after a restart
151        network_handle.update_sync_state(SyncState::Syncing);
152
153        let max_block = ctx.max_block(network_client.clone()).await?;
154
155        let static_file_producer = ctx.static_file_producer();
156        let static_file_producer_events = static_file_producer.lock().events();
157        info!(target: "reth::cli", "StaticFileProducer initialized");
158
159        let consensus = Arc::new(ctx.components().consensus().clone());
160
161        let pipeline = build_networked_pipeline(
162            &ctx.toml_config().stages,
163            network_client.clone(),
164            consensus.clone(),
165            ctx.provider_factory().clone(),
166            ctx.task_executor(),
167            ctx.sync_metrics_tx(),
168            ctx.prune_config(),
169            max_block,
170            static_file_producer,
171            ctx.components().evm_config().clone(),
172            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
173            ctx.era_import_source(),
174            disabled_stages,
175        )?;
176
177        // The new engine writes directly to static files. This ensures that they're up to the tip.
178        pipeline.move_to_static_files()?;
179
180        let pipeline_events = pipeline.events();
181
182        let mut pruner_builder = ctx.pruner_builder();
183        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
184            pruner_builder =
185                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
186        }
187        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
188        let pruner_events = pruner.events();
189        info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
190
191        let event_sender = EventSender::default();
192
193        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
194
195        // extract the jwt secret from the args if possible
196        let jwt_secret = ctx.auth_jwt_secret()?;
197
198        let add_ons_ctx = AddOnsContext {
199            node: ctx.node_adapter().clone(),
200            config: ctx.node_config(),
201            beacon_engine_handle: beacon_engine_handle.clone(),
202            jwt_secret,
203            engine_events: event_sender.clone(),
204        };
205        let validator_builder = add_ons.engine_validator_builder();
206
207        // Build the engine validator with all required components
208        let engine_validator = validator_builder
209            .clone()
210            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
211            .await?;
212
213        // Create the consensus engine stream with optional reorg
214        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
215            .maybe_skip_fcu(node_config.debug.skip_fcu)
216            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
217            .maybe_reorg(
218                ctx.blockchain_db().clone(),
219                ctx.components().evm_config().clone(),
220                || async {
221                    validator_builder
222                        .build_tree_validator(
223                            &add_ons_ctx,
224                            engine_tree_config.clone(),
225                            changeset_cache.clone(),
226                        )
227                        .await
228                },
229                node_config.debug.reorg_frequency,
230                node_config.debug.reorg_depth,
231            )
232            .await?
233            // Store messages _after_ skipping so that `replay-engine` command
234            // would replay only the messages that were observed by the engine
235            // during this run.
236            .maybe_store_messages(node_config.debug.engine_api_store.clone());
237
238        let engine_kind = if ctx.chain_spec().is_optimism() {
239            EngineApiKind::OpStack
240        } else {
241            EngineApiKind::Ethereum
242        };
243
244        let mut orchestrator = build_engine_orchestrator(
245            engine_kind,
246            consensus.clone(),
247            network_client.clone(),
248            Box::pin(consensus_engine_stream),
249            pipeline,
250            ctx.task_executor().clone(),
251            ctx.provider_factory().clone(),
252            ctx.blockchain_db().clone(),
253            pruner,
254            ctx.components().payload_builder_handle().clone(),
255            engine_validator,
256            engine_tree_config,
257            ctx.sync_metrics_tx(),
258            ctx.components().evm_config().clone(),
259            changeset_cache,
260            ctx.task_executor().clone(),
261        );
262
263        info!(target: "reth::cli", "Consensus engine initialized");
264
265        #[expect(clippy::needless_continue)]
266        let events = stream_select!(
267            event_sender.new_listener().map(Into::into),
268            pipeline_events.map(Into::into),
269            ctx.consensus_layer_events(),
270            pruner_events.map(Into::into),
271            static_file_producer_events.map(Into::into),
272        );
273
274        ctx.task_executor().spawn_critical_task(
275            "events task",
276            node::handle_events(
277                Some(Box::new(ctx.components().network().clone())),
278                Some(ctx.head().number),
279                events,
280            ),
281        );
282
283        let RpcHandle {
284            rpc_server_handles,
285            rpc_registry,
286            engine_events,
287            beacon_engine_handle,
288            engine_shutdown: _,
289        } = add_ons.launch_add_ons(add_ons_ctx).await?;
290
291        // Create engine shutdown handle
292        let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
293
294        // Run consensus engine to completion
295        let initial_target = ctx.initial_backfill_target(disabled_stages)?;
296        let mut built_payloads = ctx
297            .components()
298            .payload_builder_handle()
299            .subscribe()
300            .await
301            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
302            .into_built_payload_stream()
303            .fuse();
304
305        let chainspec = ctx.chain_spec();
306        let provider = ctx.blockchain_db().clone();
307        let (exit, rx) = oneshot::channel();
308        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
309        let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
310
311        info!(target: "reth::cli", "Starting consensus engine");
312        let consensus_engine = move |mut on_graceful_shutdown| async move {
313            if let Some(initial_target) = initial_target {
314                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
315                // network_handle's sync state is already initialized at Syncing
316                orchestrator.start_backfill_sync(initial_target);
317            } else if startup_sync_state_idle {
318                network_handle.update_sync_state(SyncState::Idle);
319            }
320
321            let mut res = Ok(());
322            let mut shutdown_rx = shutdown_rx.fuse();
323
324            // advance the chain and await payloads built locally to add into the engine api
325            // tree handler to prevent re-execution if that block is received as payload from
326            // the CL
327            loop {
328                tokio::select! {
329                    event = orchestrator.next() => {
330                        let Some(event) = event else { break };
331                        debug!(target: "reth::cli", "Event: {event}");
332                        match event {
333                            ChainEvent::BackfillSyncFinished => {
334                                if terminate_after_backfill {
335                                    debug!(target: "reth::cli", "Terminating after initial backfill");
336                                    break
337                                }
338                                if startup_sync_state_idle {
339                                    network_handle.update_sync_state(SyncState::Idle);
340                                }
341                            }
342                            ChainEvent::BackfillSyncStarted => {
343                                network_handle.update_sync_state(SyncState::Syncing);
344                            }
345                            ChainEvent::FatalError => {
346                                error!(target: "reth::cli", "Fatal error in consensus engine");
347                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
348                                break
349                            }
350                            ChainEvent::Handler(ev) => {
351                                if let Some(head) = ev.canonical_header() {
352                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
353                                    network_handle.update_sync_state(SyncState::Idle);
354                                    let head_block = Head {
355                                        number: head.number(),
356                                        hash: head.hash(),
357                                        difficulty: head.difficulty(),
358                                        timestamp: head.timestamp(),
359                                        total_difficulty: chainspec.final_paris_total_difficulty()
360                                            .filter(|_| chainspec.is_paris_active_at_block(head.number()))
361                                            .unwrap_or_default(),
362                                    };
363                                    network_handle.update_status(head_block);
364
365                                    let updated = BlockRangeUpdate {
366                                        earliest: provider.earliest_block_number().unwrap_or_default(),
367                                        latest: head.number(),
368                                        latest_hash: head.hash(),
369                                    };
370                                    network_handle.update_block_range(updated);
371                                }
372                                event_sender.notify(ev);
373                            }
374                        }
375                    }
376                    payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
377                        if let Some(executed_block) = payload.executed_block() {
378                            debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(),  "inserting built payload");
379                            orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
380                        }
381                    }
382                    shutdown_req = &mut shutdown_rx => {
383                        if let Ok(req) = shutdown_req {
384                            debug!(target: "reth::cli", "received engine shutdown request");
385                            orchestrator.handler_mut().handler_mut().on_event(
386                                FromOrchestrator::Terminate { tx: req.done_tx }.into()
387                            );
388                        }
389                    }
390                    _guard = &mut on_graceful_shutdown => {
391                        // Shutdown signal received.
392                        // Send Terminate so the engine OS thread can exit cleanly before we
393                        // drop the orchestrator.
394                        debug!(target: "reth::cli", "shutdown signal received, terminating engine");
395                        let (done_tx, done_rx) = oneshot::channel();
396                        orchestrator.handler_mut().handler_mut().on_event(
397                            FromOrchestrator::Terminate { tx: done_tx }.into()
398                        );
399                        let _ = done_rx.await;
400                        break;
401                    }
402                }
403            }
404
405            let _ = exit.send(res);
406        };
407        ctx.task_executor()
408            .spawn_critical_with_graceful_shutdown_signal("consensus engine", consensus_engine);
409
410        let engine_events_for_ethstats = engine_events.new_listener();
411
412        let full_node = FullNode {
413            evm_config: ctx.components().evm_config().clone(),
414            pool: ctx.components().pool().clone(),
415            network: ctx.components().network().clone(),
416            provider: ctx.node_adapter().provider.clone(),
417            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
418            task_executor: ctx.task_executor().clone(),
419            config: ctx.node_config().clone(),
420            data_dir: ctx.data_dir().clone(),
421            add_ons_handle: RpcHandle {
422                rpc_server_handles,
423                rpc_registry,
424                engine_events,
425                beacon_engine_handle,
426                engine_shutdown,
427            },
428        };
429        // Notify on node started
430        on_node_started.on_event(FullNode::clone(&full_node))?;
431
432        ctx.spawn_ethstats(engine_events_for_ethstats).await?;
433
434        let handle = NodeHandle {
435            node_exit_future: NodeExitFuture::new(async { rx.await? }),
436            node: full_node,
437        };
438
439        Ok(handle)
440    }
441}
442
443impl<N, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
444where
445    T: FullNodeTypes<
446        Types = N,
447        DB = DB,
448        Provider = BlockchainProvider<NodeTypesWithDBAdapter<N, DB>>,
449    >,
450    N: Node<RethFullAdapter<DB, N>> + NodeTypesForProvider,
451    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
452    CB: NodeComponentsBuilder<T> + 'static,
453    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
454        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
455        + 'static,
456{
457    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
458    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
459
460    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
461        Box::pin(self.launch_node(target))
462    }
463}