reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, FutureExt, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_service::service::{ChainEvent, EngineService};
15use reth_engine_tree::{
16    chain::FromOrchestrator,
17    engine::{EngineApiRequest, EngineRequestHandler},
18    tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28    dirs::{ChainPath, DataDirPath},
29    exit::NodeExitFuture,
30    primitives::Head,
31};
32use reth_node_events::node;
33use reth_provider::{
34    providers::{BlockchainProvider, NodeTypesForProvider},
35    BlockNumReader, MetadataProvider,
36};
37use reth_tasks::TaskExecutor;
38use reth_tokio_util::EventSender;
39use reth_tracing::tracing::{debug, error, info};
40use std::{future::Future, pin::Pin, sync::Arc};
41use tokio::sync::{mpsc::unbounded_channel, oneshot};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43use tracing::warn;
44
45/// The engine node launcher.
46#[derive(Debug)]
47pub struct EngineNodeLauncher {
48    /// The task executor for the node.
49    pub ctx: LaunchContext,
50
51    /// Temporary configuration for engine tree.
52    /// After engine is stabilized, this should be configured through node builder.
53    pub engine_tree_config: TreeConfig,
54}
55
56impl EngineNodeLauncher {
57    /// Create a new instance of the ethereum node launcher.
58    pub const fn new(
59        task_executor: TaskExecutor,
60        data_dir: ChainPath<DataDirPath>,
61        engine_tree_config: TreeConfig,
62    ) -> Self {
63        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
64    }
65
66    async fn launch_node<T, CB, AO>(
67        self,
68        target: NodeBuilderWithComponents<T, CB, AO>,
69    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
70    where
71        T: FullNodeTypes<
72            Types: NodeTypesForProvider,
73            Provider = BlockchainProvider<
74                NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
75            >,
76        >,
77        CB: NodeComponentsBuilder<T>,
78        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
79            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
80    {
81        let Self { ctx, engine_tree_config } = self;
82        let NodeBuilderWithComponents {
83            adapter: NodeTypesAdapter { database },
84            components_builder,
85            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
86            config,
87        } = target;
88        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
89
90        // setup the launch context
91        let ctx = ctx
92            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
93            // load the toml config
94            .with_loaded_toml_config(config)?
95            // add resolved peers
96            .with_resolved_peers()?
97            // attach the database
98            .attach(database.clone())
99            // ensure certain settings take effect
100            .with_adjusted_configs()
101            // Create the provider factory
102            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
103            .inspect(|ctx| {
104                info!(target: "reth::cli", "Database opened");
105                match ctx.provider_factory().storage_settings() {
106                    Ok(settings) => {
107                        info!(
108                            target: "reth::cli",
109                            ?settings,
110                            "Storage settings"
111                        );
112                    },
113                    Err(err) => {
114                        warn!(
115                            target: "reth::cli",
116                            ?err,
117                            "Failed to get storage settings"
118                        );
119                    },
120                }
121            })
122            .with_prometheus_server().await?
123            .inspect(|this| {
124                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
125            })
126            .with_genesis()?
127            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
128                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
129            })
130            .with_metrics_task()
131            // passing FullNodeTypes as type parameter here so that we can build
132            // later the components.
133            .with_blockchain_db::<T, _>(move |provider_factory| {
134                Ok(BlockchainProvider::new(provider_factory)?)
135            })?
136            .with_components(components_builder, on_component_initialized).await?;
137
138        // spawn exexs if any
139        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
140
141        // create pipeline
142        let network_handle = ctx.components().network().clone();
143        let network_client = network_handle.fetch_client().await?;
144        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
145
146        let node_config = ctx.node_config();
147
148        // We always assume that node is syncing after a restart
149        network_handle.update_sync_state(SyncState::Syncing);
150
151        let max_block = ctx.max_block(network_client.clone()).await?;
152
153        let static_file_producer = ctx.static_file_producer();
154        let static_file_producer_events = static_file_producer.lock().events();
155        info!(target: "reth::cli", "StaticFileProducer initialized");
156
157        let consensus = Arc::new(ctx.components().consensus().clone());
158
159        let pipeline = build_networked_pipeline(
160            &ctx.toml_config().stages,
161            network_client.clone(),
162            consensus.clone(),
163            ctx.provider_factory().clone(),
164            ctx.task_executor(),
165            ctx.sync_metrics_tx(),
166            ctx.prune_config(),
167            max_block,
168            static_file_producer,
169            ctx.components().evm_config().clone(),
170            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
171            ctx.era_import_source(),
172        )?;
173
174        // The new engine writes directly to static files. This ensures that they're up to the tip.
175        pipeline.move_to_static_files()?;
176
177        let pipeline_events = pipeline.events();
178
179        let mut pruner_builder = ctx.pruner_builder();
180        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
181            pruner_builder =
182                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
183        }
184        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
185        let pruner_events = pruner.events();
186        info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
187
188        let event_sender = EventSender::default();
189
190        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
191
192        // extract the jwt secret from the args if possible
193        let jwt_secret = ctx.auth_jwt_secret()?;
194
195        let add_ons_ctx = AddOnsContext {
196            node: ctx.node_adapter().clone(),
197            config: ctx.node_config(),
198            beacon_engine_handle: beacon_engine_handle.clone(),
199            jwt_secret,
200            engine_events: event_sender.clone(),
201        };
202        let validator_builder = add_ons.engine_validator_builder();
203
204        // Build the engine validator with all required components
205        let engine_validator = validator_builder
206            .clone()
207            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
208            .await?;
209
210        // Create the consensus engine stream with optional reorg
211        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
212            .maybe_skip_fcu(node_config.debug.skip_fcu)
213            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
214            .maybe_reorg(
215                ctx.blockchain_db().clone(),
216                ctx.components().evm_config().clone(),
217                || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
218                node_config.debug.reorg_frequency,
219                node_config.debug.reorg_depth,
220            )
221            .await?
222            // Store messages _after_ skipping so that `replay-engine` command
223            // would replay only the messages that were observed by the engine
224            // during this run.
225            .maybe_store_messages(node_config.debug.engine_api_store.clone());
226
227        let mut engine_service = EngineService::new(
228            consensus.clone(),
229            ctx.chain_spec(),
230            network_client.clone(),
231            Box::pin(consensus_engine_stream),
232            pipeline,
233            Box::new(ctx.task_executor().clone()),
234            ctx.provider_factory().clone(),
235            ctx.blockchain_db().clone(),
236            pruner,
237            ctx.components().payload_builder_handle().clone(),
238            engine_validator,
239            engine_tree_config,
240            ctx.sync_metrics_tx(),
241            ctx.components().evm_config().clone(),
242        );
243
244        info!(target: "reth::cli", "Consensus engine initialized");
245
246        #[allow(clippy::needless_continue)]
247        let events = stream_select!(
248            event_sender.new_listener().map(Into::into),
249            pipeline_events.map(Into::into),
250            ctx.consensus_layer_events(),
251            pruner_events.map(Into::into),
252            static_file_producer_events.map(Into::into),
253        );
254
255        ctx.task_executor().spawn_critical(
256            "events task",
257            Box::pin(node::handle_events(
258                Some(Box::new(ctx.components().network().clone())),
259                Some(ctx.head().number),
260                events,
261            )),
262        );
263
264        let RpcHandle {
265            rpc_server_handles,
266            rpc_registry,
267            engine_events,
268            beacon_engine_handle,
269            engine_shutdown: _,
270        } = add_ons.launch_add_ons(add_ons_ctx).await?;
271
272        // Create engine shutdown handle
273        let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
274
275        // Run consensus engine to completion
276        let initial_target = ctx.initial_backfill_target()?;
277        let mut built_payloads = ctx
278            .components()
279            .payload_builder_handle()
280            .subscribe()
281            .await
282            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
283            .into_built_payload_stream()
284            .fuse();
285
286        let chainspec = ctx.chain_spec();
287        let provider = ctx.blockchain_db().clone();
288        let (exit, rx) = oneshot::channel();
289        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
290        let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
291
292        info!(target: "reth::cli", "Starting consensus engine");
293        let consensus_engine = async move {
294            if let Some(initial_target) = initial_target {
295                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
296                // network_handle's sync state is already initialized at Syncing
297                engine_service.orchestrator_mut().start_backfill_sync(initial_target);
298            } else if startup_sync_state_idle {
299                network_handle.update_sync_state(SyncState::Idle);
300            }
301
302            let mut res = Ok(());
303            let mut shutdown_rx = shutdown_rx.fuse();
304
305            // advance the chain and await payloads built locally to add into the engine api
306            // tree handler to prevent re-execution if that block is received as payload from
307            // the CL
308            loop {
309                tokio::select! {
310                    shutdown_req = &mut shutdown_rx => {
311                        if let Ok(req) = shutdown_req {
312                            debug!(target: "reth::cli", "received engine shutdown request");
313                            engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(
314                                FromOrchestrator::Terminate { tx: req.done_tx }.into()
315                            );
316                        }
317                    }
318                    payload = built_payloads.select_next_some() => {
319                        if let Some(executed_block) = payload.executed_block() {
320                            debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(),  "inserting built payload");
321                            engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
322                        }
323                    }
324                    event = engine_service.next() => {
325                        let Some(event) = event else { break };
326                        debug!(target: "reth::cli", "Event: {event}");
327                        match event {
328                            ChainEvent::BackfillSyncFinished => {
329                                if terminate_after_backfill {
330                                    debug!(target: "reth::cli", "Terminating after initial backfill");
331                                    break
332                                }
333                                if startup_sync_state_idle {
334                                    network_handle.update_sync_state(SyncState::Idle);
335                                }
336                            }
337                            ChainEvent::BackfillSyncStarted => {
338                                network_handle.update_sync_state(SyncState::Syncing);
339                            }
340                            ChainEvent::FatalError => {
341                                error!(target: "reth::cli", "Fatal error in consensus engine");
342                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
343                                break
344                            }
345                            ChainEvent::Handler(ev) => {
346                                if let Some(head) = ev.canonical_header() {
347                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
348                                    network_handle.update_sync_state(SyncState::Idle);
349                                    let head_block = Head {
350                                        number: head.number(),
351                                        hash: head.hash(),
352                                        difficulty: head.difficulty(),
353                                        timestamp: head.timestamp(),
354                                        total_difficulty: chainspec.final_paris_total_difficulty()
355                                            .filter(|_| chainspec.is_paris_active_at_block(head.number()))
356                                            .unwrap_or_default(),
357                                    };
358                                    network_handle.update_status(head_block);
359
360                                    let updated = BlockRangeUpdate {
361                                        earliest: provider.earliest_block_number().unwrap_or_default(),
362                                        latest: head.number(),
363                                        latest_hash: head.hash(),
364                                    };
365                                    network_handle.update_block_range(updated);
366                                }
367                                event_sender.notify(ev);
368                            }
369                        }
370                    }
371                }
372            }
373
374            let _ = exit.send(res);
375        };
376        ctx.task_executor().spawn_critical("consensus engine", Box::pin(consensus_engine));
377
378        let engine_events_for_ethstats = engine_events.new_listener();
379
380        let full_node = FullNode {
381            evm_config: ctx.components().evm_config().clone(),
382            pool: ctx.components().pool().clone(),
383            network: ctx.components().network().clone(),
384            provider: ctx.node_adapter().provider.clone(),
385            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
386            task_executor: ctx.task_executor().clone(),
387            config: ctx.node_config().clone(),
388            data_dir: ctx.data_dir().clone(),
389            add_ons_handle: RpcHandle {
390                rpc_server_handles,
391                rpc_registry,
392                engine_events,
393                beacon_engine_handle,
394                engine_shutdown,
395            },
396        };
397        // Notify on node started
398        on_node_started.on_event(FullNode::clone(&full_node))?;
399
400        ctx.spawn_ethstats(engine_events_for_ethstats).await?;
401
402        let handle = NodeHandle {
403            node_exit_future: NodeExitFuture::new(
404                async { rx.await? },
405                full_node.config.debug.terminate,
406            ),
407            node: full_node,
408        };
409
410        Ok(handle)
411    }
412}
413
414impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
415where
416    T: FullNodeTypes<
417        Types: NodeTypesForProvider,
418        Provider = BlockchainProvider<
419            NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
420        >,
421    >,
422    CB: NodeComponentsBuilder<T> + 'static,
423    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
424        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
425        + 'static,
426{
427    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
428    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
429
430    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
431        Box::pin(self.launch_node(target))
432    }
433}