reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_db_api::{database_metrics::DatabaseMetrics, Database};
15use reth_engine_service::service::{ChainEvent, EngineService};
16use reth_engine_tree::{
17    engine::{EngineApiRequest, EngineRequestHandler},
18    tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_core::{
28    dirs::{ChainPath, DataDirPath},
29    exit::NodeExitFuture,
30    primitives::Head,
31};
32use reth_node_events::node;
33use reth_provider::{
34    providers::{BlockchainProvider, NodeTypesForProvider},
35    BlockNumReader,
36};
37use reth_tasks::TaskExecutor;
38use reth_tokio_util::EventSender;
39use reth_tracing::tracing::{debug, error, info};
40use std::sync::Arc;
41use tokio::sync::{mpsc::unbounded_channel, oneshot};
42use tokio_stream::wrappers::UnboundedReceiverStream;
43
44/// The engine node launcher.
45#[derive(Debug)]
46pub struct EngineNodeLauncher {
47    /// The task executor for the node.
48    pub ctx: LaunchContext,
49
50    /// Temporary configuration for engine tree.
51    /// After engine is stabilized, this should be configured through node builder.
52    pub engine_tree_config: TreeConfig,
53}
54
55impl EngineNodeLauncher {
56    /// Create a new instance of the ethereum node launcher.
57    pub const fn new(
58        task_executor: TaskExecutor,
59        data_dir: ChainPath<DataDirPath>,
60        engine_tree_config: TreeConfig,
61    ) -> Self {
62        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
63    }
64}
65
66impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
67where
68    Types: NodeTypesForProvider + NodeTypes,
69    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
70    T: FullNodeTypes<
71        Types = Types,
72        DB = DB,
73        Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
74    >,
75    CB: NodeComponentsBuilder<T>,
76    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
77        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
78{
79    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
80
81    async fn launch_node(
82        self,
83        target: NodeBuilderWithComponents<T, CB, AO>,
84    ) -> eyre::Result<Self::Node> {
85        let Self { ctx, engine_tree_config } = self;
86        let NodeBuilderWithComponents {
87            adapter: NodeTypesAdapter { database },
88            components_builder,
89            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
90            config,
91        } = target;
92        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
93
94        // setup the launch context
95        let ctx = ctx
96            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
97            // load the toml config
98            .with_loaded_toml_config(config)?
99            // add resolved peers
100            .with_resolved_peers()?
101            // attach the database
102            .attach(database.clone())
103            // ensure certain settings take effect
104            .with_adjusted_configs()
105            // Create the provider factory
106            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
107            .inspect(|_| {
108                info!(target: "reth::cli", "Database opened");
109            })
110            .with_prometheus_server().await?
111            .inspect(|this| {
112                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
113            })
114            .with_genesis()?
115            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
116                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
117            })
118            .with_metrics_task()
119            // passing FullNodeTypes as type parameter here so that we can build
120            // later the components.
121            .with_blockchain_db::<T, _>(move |provider_factory| {
122                Ok(BlockchainProvider::new(provider_factory)?)
123            })?
124            .with_components(components_builder, on_component_initialized).await?;
125
126        // Try to expire pre-merge transaction history if configured
127        ctx.expire_pre_merge_transactions()?;
128
129        // spawn exexs if any
130        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
131
132        // create pipeline
133        let network_handle = ctx.components().network().clone();
134        let network_client = network_handle.fetch_client().await?;
135        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
136
137        let node_config = ctx.node_config();
138
139        // We always assume that node is syncing after a restart
140        network_handle.update_sync_state(SyncState::Syncing);
141
142        let max_block = ctx.max_block(network_client.clone()).await?;
143
144        let static_file_producer = ctx.static_file_producer();
145        let static_file_producer_events = static_file_producer.lock().events();
146        info!(target: "reth::cli", "StaticFileProducer initialized");
147
148        let consensus = Arc::new(ctx.components().consensus().clone());
149
150        let pipeline = build_networked_pipeline(
151            &ctx.toml_config().stages,
152            network_client.clone(),
153            consensus.clone(),
154            ctx.provider_factory().clone(),
155            ctx.task_executor(),
156            ctx.sync_metrics_tx(),
157            ctx.prune_config(),
158            max_block,
159            static_file_producer,
160            ctx.components().evm_config().clone(),
161            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
162            ctx.era_import_source(),
163        )?;
164
165        // The new engine writes directly to static files. This ensures that they're up to the tip.
166        pipeline.move_to_static_files()?;
167
168        let pipeline_events = pipeline.events();
169
170        let mut pruner_builder = ctx.pruner_builder();
171        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
172            pruner_builder =
173                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
174        }
175        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
176        let pruner_events = pruner.events();
177        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
178
179        let event_sender = EventSender::default();
180
181        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
182
183        // extract the jwt secret from the args if possible
184        let jwt_secret = ctx.auth_jwt_secret()?;
185
186        let add_ons_ctx = AddOnsContext {
187            node: ctx.node_adapter().clone(),
188            config: ctx.node_config(),
189            beacon_engine_handle: beacon_engine_handle.clone(),
190            jwt_secret,
191            engine_events: event_sender.clone(),
192        };
193        let validator_builder = add_ons.engine_validator_builder();
194
195        // Build the engine validator with all required components
196        let engine_validator = validator_builder
197            .clone()
198            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
199            .await?;
200
201        // Create the consensus engine stream with optional reorg
202        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
203            .maybe_skip_fcu(node_config.debug.skip_fcu)
204            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
205            .maybe_reorg(
206                ctx.blockchain_db().clone(),
207                ctx.components().evm_config().clone(),
208                || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
209                node_config.debug.reorg_frequency,
210                node_config.debug.reorg_depth,
211            )
212            .await?
213            // Store messages _after_ skipping so that `replay-engine` command
214            // would replay only the messages that were observed by the engine
215            // during this run.
216            .maybe_store_messages(node_config.debug.engine_api_store.clone());
217
218        let mut engine_service = EngineService::new(
219            consensus.clone(),
220            ctx.chain_spec(),
221            network_client.clone(),
222            Box::pin(consensus_engine_stream),
223            pipeline,
224            Box::new(ctx.task_executor().clone()),
225            ctx.provider_factory().clone(),
226            ctx.blockchain_db().clone(),
227            pruner,
228            ctx.components().payload_builder_handle().clone(),
229            engine_validator,
230            engine_tree_config,
231            ctx.sync_metrics_tx(),
232            ctx.components().evm_config().clone(),
233        );
234
235        info!(target: "reth::cli", "Consensus engine initialized");
236
237        let events = stream_select!(
238            event_sender.new_listener().map(Into::into),
239            pipeline_events.map(Into::into),
240            ctx.consensus_layer_events(),
241            pruner_events.map(Into::into),
242            static_file_producer_events.map(Into::into),
243        );
244
245        ctx.task_executor().spawn_critical(
246            "events task",
247            Box::pin(node::handle_events(
248                Some(Box::new(ctx.components().network().clone())),
249                Some(ctx.head().number),
250                events,
251            )),
252        );
253
254        let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
255            add_ons.launch_add_ons(add_ons_ctx).await?;
256
257        // Run consensus engine to completion
258        let initial_target = ctx.initial_backfill_target()?;
259        let mut built_payloads = ctx
260            .components()
261            .payload_builder_handle()
262            .subscribe()
263            .await
264            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
265            .into_built_payload_stream()
266            .fuse();
267
268        let chainspec = ctx.chain_spec();
269        let provider = ctx.blockchain_db().clone();
270        let (exit, rx) = oneshot::channel();
271        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
272
273        info!(target: "reth::cli", "Starting consensus engine");
274        ctx.task_executor().spawn_critical("consensus engine", Box::pin(async move {
275            if let Some(initial_target) = initial_target {
276                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
277                engine_service.orchestrator_mut().start_backfill_sync(initial_target);
278            }
279
280            let mut res = Ok(());
281
282            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
283            loop {
284                tokio::select! {
285                    payload = built_payloads.select_next_some() => {
286                        if let Some(executed_block) = payload.executed_block() {
287                            debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(),  "inserting built payload");
288                            engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
289                        }
290                    }
291                    event = engine_service.next() => {
292                        let Some(event) = event else { break };
293                        debug!(target: "reth::cli", "Event: {event}");
294                        match event {
295                            ChainEvent::BackfillSyncFinished => {
296                                if terminate_after_backfill {
297                                    debug!(target: "reth::cli", "Terminating after initial backfill");
298                                    break
299                                }
300                            }
301                            ChainEvent::BackfillSyncStarted => {
302                                network_handle.update_sync_state(SyncState::Syncing);
303                            }
304                            ChainEvent::FatalError => {
305                                error!(target: "reth::cli", "Fatal error in consensus engine");
306                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
307                                break
308                            }
309                            ChainEvent::Handler(ev) => {
310                                if let Some(head) = ev.canonical_header() {
311                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
312                                    network_handle.update_sync_state(SyncState::Idle);
313                                                                        let head_block = Head {
314                                        number: head.number(),
315                                        hash: head.hash(),
316                                        difficulty: head.difficulty(),
317                                        timestamp: head.timestamp(),
318                                        total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
319                                    };
320                                    network_handle.update_status(head_block);
321
322                                    let updated = BlockRangeUpdate {
323                                        earliest: provider.earliest_block_number().unwrap_or_default(),
324                                        latest:head.number(),
325                                        latest_hash:head.hash()
326                                    };
327                                    network_handle.update_block_range(updated);
328                                }
329                                event_sender.notify(ev);
330                            }
331                        }
332                    }
333                }
334            }
335
336            let _ = exit.send(res);
337        }));
338
339        let full_node = FullNode {
340            evm_config: ctx.components().evm_config().clone(),
341            pool: ctx.components().pool().clone(),
342            network: ctx.components().network().clone(),
343            provider: ctx.node_adapter().provider.clone(),
344            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
345            task_executor: ctx.task_executor().clone(),
346            config: ctx.node_config().clone(),
347            data_dir: ctx.data_dir().clone(),
348            add_ons_handle: RpcHandle {
349                rpc_server_handles,
350                rpc_registry,
351                engine_events,
352                beacon_engine_handle,
353            },
354        };
355        // Notify on node started
356        on_node_started.on_event(FullNode::clone(&full_node))?;
357
358        ctx.spawn_ethstats().await?;
359
360        let handle = NodeHandle {
361            node_exit_future: NodeExitFuture::new(
362                async { rx.await? },
363                full_node.config.debug.terminate,
364            ),
365            node: full_node,
366        };
367
368        Ok(handle)
369    }
370}