Skip to main content

reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_tree::{
15    chain::{ChainEvent, FromOrchestrator},
16    engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
17    launch::build_engine_orchestrator,
18    tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28    dirs::{ChainPath, DataDirPath},
29    exit::NodeExitFuture,
30    primitives::Head,
31};
32use reth_node_events::node;
33use reth_provider::{
34    providers::{BlockchainProvider, NodeTypesForProvider},
35    BlockNumReader, StorageSettingsCache,
36};
37use reth_tasks::TaskExecutor;
38use reth_tokio_util::EventSender;
39use reth_tracing::tracing::{debug, error, info};
40use reth_trie_db::ChangesetCache;
41use std::{future::Future, pin::Pin, sync::Arc};
42use tokio::sync::{mpsc::unbounded_channel, oneshot};
43use tokio_stream::wrappers::UnboundedReceiverStream;
44
45/// The engine node launcher.
46#[derive(Debug)]
47pub struct EngineNodeLauncher {
48    /// The task executor for the node.
49    pub ctx: LaunchContext,
50
51    /// Temporary configuration for engine tree.
52    /// After engine is stabilized, this should be configured through node builder.
53    pub engine_tree_config: TreeConfig,
54}
55
56impl EngineNodeLauncher {
57    /// Create a new instance of the ethereum node launcher.
58    pub const fn new(
59        task_executor: TaskExecutor,
60        data_dir: ChainPath<DataDirPath>,
61        engine_tree_config: TreeConfig,
62    ) -> Self {
63        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
64    }
65
66    async fn launch_node<T, CB, AO>(
67        self,
68        target: NodeBuilderWithComponents<T, CB, AO>,
69    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
70    where
71        T: FullNodeTypes<
72            Types: NodeTypesForProvider,
73            Provider = BlockchainProvider<
74                NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
75            >,
76        >,
77        CB: NodeComponentsBuilder<T>,
78        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
79            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
80    {
81        let Self { ctx, engine_tree_config } = self;
82        let NodeBuilderWithComponents {
83            adapter: NodeTypesAdapter { database },
84            rocksdb_provider,
85            components_builder,
86            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
87            config,
88        } = target;
89        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
90
91        // Create changeset cache that will be shared across the engine
92        let changeset_cache = ChangesetCache::new();
93
94        // setup the launch context
95        let ctx = ctx
96            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
97            // load the toml config
98            .with_loaded_toml_config(config)?
99            // add resolved peers
100            .with_resolved_peers()?
101            // attach the database
102            .attach(database.clone())
103            // ensure certain settings take effect
104            .with_adjusted_configs()
105            // Create the provider factory with changeset cache
106            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(changeset_cache.clone(), rocksdb_provider).await?
107            .inspect(|_| {
108                info!(target: "reth::cli", "Database opened");
109            })
110            .with_prometheus_server().await?
111            .inspect(|this| {
112                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
113            })
114            .with_genesis()?
115            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
116                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
117                let settings = this.provider_factory().cached_storage_settings();
118                info!(target: "reth::cli", ?settings, "Loaded storage settings");
119            })
120            .with_metrics_task()
121            // passing FullNodeTypes as type parameter here so that we can build
122            // later the components.
123            .with_blockchain_db::<T, _>(move |provider_factory| {
124                Ok(BlockchainProvider::new(provider_factory)?)
125            })?
126            .with_components(components_builder, on_component_initialized).await?;
127
128        // spawn exexs if any
129        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
130
131        // create pipeline
132        let network_handle = ctx.components().network().clone();
133        let network_client = network_handle.fetch_client().await?;
134        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
135
136        let node_config = ctx.node_config();
137
138        // We always assume that node is syncing after a restart
139        network_handle.update_sync_state(SyncState::Syncing);
140
141        let max_block = ctx.max_block(network_client.clone()).await?;
142
143        let static_file_producer = ctx.static_file_producer();
144        let static_file_producer_events = static_file_producer.lock().events();
145        info!(target: "reth::cli", "StaticFileProducer initialized");
146
147        let consensus = Arc::new(ctx.components().consensus().clone());
148
149        let pipeline = build_networked_pipeline(
150            &ctx.toml_config().stages,
151            network_client.clone(),
152            consensus.clone(),
153            ctx.provider_factory().clone(),
154            ctx.task_executor(),
155            ctx.sync_metrics_tx(),
156            ctx.prune_config(),
157            max_block,
158            static_file_producer,
159            ctx.components().evm_config().clone(),
160            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
161            ctx.era_import_source(),
162        )?;
163
164        // The new engine writes directly to static files. This ensures that they're up to the tip.
165        pipeline.move_to_static_files()?;
166
167        let pipeline_events = pipeline.events();
168
169        let mut pruner_builder = ctx.pruner_builder();
170        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
171            pruner_builder =
172                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
173        }
174        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
175        let pruner_events = pruner.events();
176        info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
177
178        let event_sender = EventSender::default();
179
180        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
181
182        // extract the jwt secret from the args if possible
183        let jwt_secret = ctx.auth_jwt_secret()?;
184
185        let add_ons_ctx = AddOnsContext {
186            node: ctx.node_adapter().clone(),
187            config: ctx.node_config(),
188            beacon_engine_handle: beacon_engine_handle.clone(),
189            jwt_secret,
190            engine_events: event_sender.clone(),
191        };
192        let validator_builder = add_ons.engine_validator_builder();
193
194        // Build the engine validator with all required components
195        let engine_validator = validator_builder
196            .clone()
197            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
198            .await?;
199
200        // Create the consensus engine stream with optional reorg
201        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
202            .maybe_skip_fcu(node_config.debug.skip_fcu)
203            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
204            .maybe_reorg(
205                ctx.blockchain_db().clone(),
206                ctx.components().evm_config().clone(),
207                || async {
208                    // Create a separate cache for reorg validator (not shared with main engine)
209                    let reorg_cache = ChangesetCache::new();
210                    validator_builder
211                        .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
212                        .await
213                },
214                node_config.debug.reorg_frequency,
215                node_config.debug.reorg_depth,
216            )
217            .await?
218            // Store messages _after_ skipping so that `replay-engine` command
219            // would replay only the messages that were observed by the engine
220            // during this run.
221            .maybe_store_messages(node_config.debug.engine_api_store.clone());
222
223        let engine_kind = if ctx.chain_spec().is_optimism() {
224            EngineApiKind::OpStack
225        } else {
226            EngineApiKind::Ethereum
227        };
228
229        let mut orchestrator = build_engine_orchestrator(
230            engine_kind,
231            consensus.clone(),
232            network_client.clone(),
233            Box::pin(consensus_engine_stream),
234            pipeline,
235            ctx.task_executor().clone(),
236            ctx.provider_factory().clone(),
237            ctx.blockchain_db().clone(),
238            pruner,
239            ctx.components().payload_builder_handle().clone(),
240            engine_validator,
241            engine_tree_config,
242            ctx.sync_metrics_tx(),
243            ctx.components().evm_config().clone(),
244            changeset_cache,
245            ctx.task_executor().clone(),
246        );
247
248        info!(target: "reth::cli", "Consensus engine initialized");
249
250        #[allow(clippy::needless_continue)]
251        let events = stream_select!(
252            event_sender.new_listener().map(Into::into),
253            pipeline_events.map(Into::into),
254            ctx.consensus_layer_events(),
255            pruner_events.map(Into::into),
256            static_file_producer_events.map(Into::into),
257        );
258
259        ctx.task_executor().spawn_critical_task(
260            "events task",
261            node::handle_events(
262                Some(Box::new(ctx.components().network().clone())),
263                Some(ctx.head().number),
264                events,
265            ),
266        );
267
268        let RpcHandle {
269            rpc_server_handles,
270            rpc_registry,
271            engine_events,
272            beacon_engine_handle,
273            engine_shutdown: _,
274        } = add_ons.launch_add_ons(add_ons_ctx).await?;
275
276        // Create engine shutdown handle
277        let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
278
279        // Run consensus engine to completion
280        let initial_target = ctx.initial_backfill_target()?;
281        let mut built_payloads = ctx
282            .components()
283            .payload_builder_handle()
284            .subscribe()
285            .await
286            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
287            .into_built_payload_stream()
288            .fuse();
289
290        let chainspec = ctx.chain_spec();
291        let provider = ctx.blockchain_db().clone();
292        let (exit, rx) = oneshot::channel();
293        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
294        let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
295
296        info!(target: "reth::cli", "Starting consensus engine");
297        let consensus_engine = move |mut on_graceful_shutdown| async move {
298            if let Some(initial_target) = initial_target {
299                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
300                // network_handle's sync state is already initialized at Syncing
301                orchestrator.start_backfill_sync(initial_target);
302            } else if startup_sync_state_idle {
303                network_handle.update_sync_state(SyncState::Idle);
304            }
305
306            let mut res = Ok(());
307            let mut shutdown_rx = shutdown_rx.fuse();
308
309            // advance the chain and await payloads built locally to add into the engine api
310            // tree handler to prevent re-execution if that block is received as payload from
311            // the CL
312            loop {
313                tokio::select! {
314                    event = orchestrator.next() => {
315                        let Some(event) = event else { break };
316                        debug!(target: "reth::cli", "Event: {event}");
317                        match event {
318                            ChainEvent::BackfillSyncFinished => {
319                                if terminate_after_backfill {
320                                    debug!(target: "reth::cli", "Terminating after initial backfill");
321                                    break
322                                }
323                                if startup_sync_state_idle {
324                                    network_handle.update_sync_state(SyncState::Idle);
325                                }
326                            }
327                            ChainEvent::BackfillSyncStarted => {
328                                network_handle.update_sync_state(SyncState::Syncing);
329                            }
330                            ChainEvent::FatalError => {
331                                error!(target: "reth::cli", "Fatal error in consensus engine");
332                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
333                                break
334                            }
335                            ChainEvent::Handler(ev) => {
336                                if let Some(head) = ev.canonical_header() {
337                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
338                                    network_handle.update_sync_state(SyncState::Idle);
339                                    let head_block = Head {
340                                        number: head.number(),
341                                        hash: head.hash(),
342                                        difficulty: head.difficulty(),
343                                        timestamp: head.timestamp(),
344                                        total_difficulty: chainspec.final_paris_total_difficulty()
345                                            .filter(|_| chainspec.is_paris_active_at_block(head.number()))
346                                            .unwrap_or_default(),
347                                    };
348                                    network_handle.update_status(head_block);
349
350                                    let updated = BlockRangeUpdate {
351                                        earliest: provider.earliest_block_number().unwrap_or_default(),
352                                        latest: head.number(),
353                                        latest_hash: head.hash(),
354                                    };
355                                    network_handle.update_block_range(updated);
356                                }
357                                event_sender.notify(ev);
358                            }
359                        }
360                    }
361                    payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
362                        if let Some(executed_block) = payload.executed_block() {
363                            debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(),  "inserting built payload");
364                            orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
365                        }
366                    }
367                    shutdown_req = &mut shutdown_rx => {
368                        if let Ok(req) = shutdown_req {
369                            debug!(target: "reth::cli", "received engine shutdown request");
370                            orchestrator.handler_mut().handler_mut().on_event(
371                                FromOrchestrator::Terminate { tx: req.done_tx }.into()
372                            );
373                        }
374                    }
375                    _guard = &mut on_graceful_shutdown => {
376                        // Shutdown signal received.
377                        // Send Terminate so the engine OS thread can exit cleanly before we
378                        // drop the orchestrator.
379                        debug!(target: "reth::cli", "shutdown signal received, terminating engine");
380                        let (done_tx, done_rx) = oneshot::channel();
381                        orchestrator.handler_mut().handler_mut().on_event(
382                            FromOrchestrator::Terminate { tx: done_tx }.into()
383                        );
384                        let _ = done_rx.await;
385                        break;
386                    }
387                }
388            }
389
390            let _ = exit.send(res);
391        };
392        ctx.task_executor()
393            .spawn_critical_with_graceful_shutdown_signal("consensus engine", consensus_engine);
394
395        let engine_events_for_ethstats = engine_events.new_listener();
396
397        let full_node = FullNode {
398            evm_config: ctx.components().evm_config().clone(),
399            pool: ctx.components().pool().clone(),
400            network: ctx.components().network().clone(),
401            provider: ctx.node_adapter().provider.clone(),
402            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
403            task_executor: ctx.task_executor().clone(),
404            config: ctx.node_config().clone(),
405            data_dir: ctx.data_dir().clone(),
406            add_ons_handle: RpcHandle {
407                rpc_server_handles,
408                rpc_registry,
409                engine_events,
410                beacon_engine_handle,
411                engine_shutdown,
412            },
413        };
414        // Notify on node started
415        on_node_started.on_event(FullNode::clone(&full_node))?;
416
417        ctx.spawn_ethstats(engine_events_for_ethstats).await?;
418
419        let handle = NodeHandle {
420            node_exit_future: NodeExitFuture::new(async { rx.await? }),
421            node: full_node,
422        };
423
424        Ok(handle)
425    }
426}
427
428impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
429where
430    T: FullNodeTypes<
431        Types: NodeTypesForProvider,
432        Provider = BlockchainProvider<
433            NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
434        >,
435    >,
436    CB: NodeComponentsBuilder<T> + 'static,
437    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
438        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
439        + 'static,
440{
441    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
442    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
443
444    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
445        Box::pin(self.launch_node(target))
446    }
447}