reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{future::Either, stream, stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_db_api::{database_metrics::DatabaseMetrics, Database};
15use reth_engine_local::{LocalMiner, LocalPayloadAttributesBuilder};
16use reth_engine_service::service::{ChainEvent, EngineService};
17use reth_engine_tree::{
18    engine::{EngineApiRequest, EngineRequestHandler},
19    tree::TreeConfig,
20};
21use reth_engine_util::EngineMessageStreamExt;
22use reth_exex::ExExManagerHandle;
23use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
24use reth_network_api::BlockDownloaderProvider;
25use reth_node_api::{
26    BeaconConsensusEngineHandle, BuiltPayload, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
27    PayloadAttributesBuilder, PayloadTypes,
28};
29use reth_node_core::{
30    args::DefaultEraHost,
31    dirs::{ChainPath, DataDirPath},
32    exit::NodeExitFuture,
33    primitives::Head,
34};
35use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
36use reth_provider::{
37    providers::{BlockchainProvider, NodeTypesForProvider},
38    BlockNumReader,
39};
40use reth_stages::stages::EraImportSource;
41use reth_tasks::TaskExecutor;
42use reth_tokio_util::EventSender;
43use reth_tracing::tracing::{debug, error, info};
44use std::sync::Arc;
45use tokio::sync::{mpsc::unbounded_channel, oneshot};
46use tokio_stream::wrappers::UnboundedReceiverStream;
47
48/// The engine node launcher.
49#[derive(Debug)]
50pub struct EngineNodeLauncher {
51    /// The task executor for the node.
52    pub ctx: LaunchContext,
53
54    /// Temporary configuration for engine tree.
55    /// After engine is stabilized, this should be configured through node builder.
56    pub engine_tree_config: TreeConfig,
57}
58
59impl EngineNodeLauncher {
60    /// Create a new instance of the ethereum node launcher.
61    pub const fn new(
62        task_executor: TaskExecutor,
63        data_dir: ChainPath<DataDirPath>,
64        engine_tree_config: TreeConfig,
65    ) -> Self {
66        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
67    }
68}
69
70impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
71where
72    Types: NodeTypesForProvider + NodeTypes,
73    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
74    T: FullNodeTypes<
75        Types = Types,
76        DB = DB,
77        Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
78    >,
79    CB: NodeComponentsBuilder<T>,
80    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
81        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
82    LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
83        <<Types as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
84    >,
85{
86    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
87
88    async fn launch_node(
89        self,
90        target: NodeBuilderWithComponents<T, CB, AO>,
91    ) -> eyre::Result<Self::Node> {
92        let Self { ctx, engine_tree_config } = self;
93        let NodeBuilderWithComponents {
94            adapter: NodeTypesAdapter { database },
95            components_builder,
96            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
97            config,
98        } = target;
99        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
100
101        // setup the launch context
102        let ctx = ctx
103            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
104            // load the toml config
105            .with_loaded_toml_config(config)?
106            // add resolved peers
107            .with_resolved_peers()?
108            // attach the database
109            .attach(database.clone())
110            // ensure certain settings take effect
111            .with_adjusted_configs()
112            // Create the provider factory
113            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
114            .inspect(|_| {
115                info!(target: "reth::cli", "Database opened");
116            })
117            .with_prometheus_server().await?
118            .inspect(|this| {
119                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
120            })
121            .with_genesis()?
122            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
123                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
124            })
125            .with_metrics_task()
126            // passing FullNodeTypes as type parameter here so that we can build
127            // later the components.
128            .with_blockchain_db::<T, _>(move |provider_factory| {
129                Ok(BlockchainProvider::new(provider_factory)?)
130            })?
131            .with_components(components_builder, on_component_initialized).await?;
132
133        // spawn exexs
134        let exex_manager_handle = ExExLauncher::new(
135            ctx.head(),
136            ctx.node_adapter().clone(),
137            installed_exex,
138            ctx.configs().clone(),
139        )
140        .launch()
141        .await?;
142
143        // create pipeline
144        let network_handle = ctx.components().network().clone();
145        let network_client = network_handle.fetch_client().await?;
146        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
147
148        let node_config = ctx.node_config();
149
150        // We always assume that node is syncing after a restart
151        network_handle.update_sync_state(SyncState::Syncing);
152
153        let max_block = ctx.max_block(network_client.clone()).await?;
154
155        let static_file_producer = ctx.static_file_producer();
156        let static_file_producer_events = static_file_producer.lock().events();
157        info!(target: "reth::cli", "StaticFileProducer initialized");
158
159        let consensus = Arc::new(ctx.components().consensus().clone());
160
161        // Configure the pipeline
162        let pipeline_exex_handle =
163            exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
164
165        let era_import_source = if node_config.era.enabled {
166            EraImportSource::maybe_new(
167                node_config.era.source.path.clone(),
168                node_config.era.source.url.clone(),
169                || node_config.chain.chain().kind().default_era_host(),
170                || node_config.datadir().data_dir().join("era").into(),
171            )
172        } else {
173            None
174        };
175
176        let pipeline = build_networked_pipeline(
177            &ctx.toml_config().stages,
178            network_client.clone(),
179            consensus.clone(),
180            ctx.provider_factory().clone(),
181            ctx.task_executor(),
182            ctx.sync_metrics_tx(),
183            ctx.prune_config(),
184            max_block,
185            static_file_producer,
186            ctx.components().evm_config().clone(),
187            pipeline_exex_handle,
188            era_import_source,
189        )?;
190
191        // The new engine writes directly to static files. This ensures that they're up to the tip.
192        pipeline.move_to_static_files()?;
193
194        let pipeline_events = pipeline.events();
195
196        let mut pruner_builder = ctx.pruner_builder();
197        if let Some(exex_manager_handle) = &exex_manager_handle {
198            pruner_builder =
199                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
200        }
201        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
202        let pruner_events = pruner.events();
203        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
204
205        let event_sender = EventSender::default();
206
207        let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
208
209        // extract the jwt secret from the args if possible
210        let jwt_secret = ctx.auth_jwt_secret()?;
211
212        let add_ons_ctx = AddOnsContext {
213            node: ctx.node_adapter().clone(),
214            config: ctx.node_config(),
215            beacon_engine_handle: beacon_engine_handle.clone(),
216            jwt_secret,
217            engine_events: event_sender.clone(),
218        };
219        let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
220
221        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
222            .maybe_skip_fcu(node_config.debug.skip_fcu)
223            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
224            .maybe_reorg(
225                ctx.blockchain_db().clone(),
226                ctx.components().evm_config().clone(),
227                engine_payload_validator.clone(),
228                node_config.debug.reorg_frequency,
229                node_config.debug.reorg_depth,
230            )
231            // Store messages _after_ skipping so that `replay-engine` command
232            // would replay only the messages that were observed by the engine
233            // during this run.
234            .maybe_store_messages(node_config.debug.engine_api_store.clone());
235
236        let mut engine_service = EngineService::new(
237            consensus.clone(),
238            ctx.chain_spec(),
239            network_client.clone(),
240            Box::pin(consensus_engine_stream),
241            pipeline,
242            Box::new(ctx.task_executor().clone()),
243            ctx.provider_factory().clone(),
244            ctx.blockchain_db().clone(),
245            pruner,
246            ctx.components().payload_builder_handle().clone(),
247            engine_payload_validator,
248            engine_tree_config,
249            ctx.invalid_block_hook()?,
250            ctx.sync_metrics_tx(),
251            ctx.components().evm_config().clone(),
252        );
253
254        if ctx.is_dev() {
255            ctx.task_executor().spawn_critical(
256                "local engine",
257                LocalMiner::new(
258                    ctx.blockchain_db().clone(),
259                    LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
260                    beacon_engine_handle.clone(),
261                    ctx.dev_mining_mode(ctx.components().pool()),
262                    ctx.components().payload_builder_handle().clone(),
263                )
264                .run(),
265            );
266        }
267
268        info!(target: "reth::cli", "Consensus engine initialized");
269
270        let events = stream_select!(
271            event_sender.new_listener().map(Into::into),
272            pipeline_events.map(Into::into),
273            if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
274                Either::Left(
275                    ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
276                        .map(Into::into),
277                )
278            } else {
279                Either::Right(stream::empty())
280            },
281            pruner_events.map(Into::into),
282            static_file_producer_events.map(Into::into),
283        );
284
285        ctx.task_executor().spawn_critical(
286            "events task",
287            node::handle_events(
288                Some(Box::new(ctx.components().network().clone())),
289                Some(ctx.head().number),
290                events,
291            ),
292        );
293
294        let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
295            add_ons.launch_add_ons(add_ons_ctx).await?;
296
297        // Run consensus engine to completion
298        let initial_target = ctx.initial_backfill_target()?;
299        let mut built_payloads = ctx
300            .components()
301            .payload_builder_handle()
302            .subscribe()
303            .await
304            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
305            .into_built_payload_stream()
306            .fuse();
307
308        let chainspec = ctx.chain_spec();
309        let provider = ctx.blockchain_db().clone();
310        let (exit, rx) = oneshot::channel();
311        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
312
313        info!(target: "reth::cli", "Starting consensus engine");
314        ctx.task_executor().spawn_critical("consensus engine", async move {
315            if let Some(initial_target) = initial_target {
316                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
317                engine_service.orchestrator_mut().start_backfill_sync(initial_target);
318            }
319
320            let mut res = Ok(());
321
322            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
323            loop {
324                tokio::select! {
325                    payload = built_payloads.select_next_some() => {
326                        if let Some(executed_block) = payload.executed_block() {
327                            debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(),  "inserting built payload");
328                            engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
329                        }
330                    }
331                    event = engine_service.next() => {
332                        let Some(event) = event else { break };
333                        debug!(target: "reth::cli", "Event: {event}");
334                        match event {
335                            ChainEvent::BackfillSyncFinished => {
336                                if terminate_after_backfill {
337                                    debug!(target: "reth::cli", "Terminating after initial backfill");
338                                    break
339                                }
340                            }
341                            ChainEvent::BackfillSyncStarted => {
342                                network_handle.update_sync_state(SyncState::Syncing);
343                            }
344                            ChainEvent::FatalError => {
345                                error!(target: "reth::cli", "Fatal error in consensus engine");
346                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
347                                break
348                            }
349                            ChainEvent::Handler(ev) => {
350                                if let Some(head) = ev.canonical_header() {
351                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
352                                    network_handle.update_sync_state(SyncState::Idle);
353                                                                        let head_block = Head {
354                                        number: head.number(),
355                                        hash: head.hash(),
356                                        difficulty: head.difficulty(),
357                                        timestamp: head.timestamp(),
358                                        total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
359                                    };
360                                    network_handle.update_status(head_block);
361
362                                    let updated = BlockRangeUpdate {
363                                        earliest: provider.earliest_block_number().unwrap_or_default(),
364                                        latest:head.number(),
365                                        latest_hash:head.hash()
366                                    };
367                                    network_handle.update_block_range(updated);
368                                }
369                                event_sender.notify(ev);
370                            }
371                        }
372                    }
373                }
374            }
375
376            let _ = exit.send(res);
377        });
378
379        let full_node = FullNode {
380            evm_config: ctx.components().evm_config().clone(),
381            pool: ctx.components().pool().clone(),
382            network: ctx.components().network().clone(),
383            provider: ctx.node_adapter().provider.clone(),
384            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
385            task_executor: ctx.task_executor().clone(),
386            config: ctx.node_config().clone(),
387            data_dir: ctx.data_dir().clone(),
388            add_ons_handle: RpcHandle {
389                rpc_server_handles,
390                rpc_registry,
391                engine_events,
392                beacon_engine_handle,
393            },
394        };
395        // Notify on node started
396        on_node_started.on_event(FullNode::clone(&full_node))?;
397
398        let handle = NodeHandle {
399            node_exit_future: NodeExitFuture::new(
400                async { rx.await? },
401                full_node.config.debug.terminate,
402            ),
403            node: full_node,
404        };
405
406        Ok(handle)
407    }
408}