reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_service::service::{ChainEvent, EngineService};
15use reth_engine_tree::{
16    engine::{EngineApiRequest, EngineRequestHandler},
17    tree::TreeConfig,
18};
19use reth_engine_util::EngineMessageStreamExt;
20use reth_exex::ExExManagerHandle;
21use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
22use reth_network_api::BlockDownloaderProvider;
23use reth_node_api::{
24    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
25};
26use reth_node_core::{
27    dirs::{ChainPath, DataDirPath},
28    exit::NodeExitFuture,
29    primitives::Head,
30};
31use reth_node_events::node;
32use reth_provider::{
33    providers::{BlockchainProvider, NodeTypesForProvider},
34    BlockNumReader, MetadataProvider,
35};
36use reth_tasks::TaskExecutor;
37use reth_tokio_util::EventSender;
38use reth_tracing::tracing::{debug, error, info};
39use std::{future::Future, pin::Pin, sync::Arc};
40use tokio::sync::{mpsc::unbounded_channel, oneshot};
41use tokio_stream::wrappers::UnboundedReceiverStream;
42use tracing::warn;
43
44/// The engine node launcher.
45#[derive(Debug)]
46pub struct EngineNodeLauncher {
47    /// The task executor for the node.
48    pub ctx: LaunchContext,
49
50    /// Temporary configuration for engine tree.
51    /// After engine is stabilized, this should be configured through node builder.
52    pub engine_tree_config: TreeConfig,
53}
54
55impl EngineNodeLauncher {
56    /// Create a new instance of the ethereum node launcher.
57    pub const fn new(
58        task_executor: TaskExecutor,
59        data_dir: ChainPath<DataDirPath>,
60        engine_tree_config: TreeConfig,
61    ) -> Self {
62        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
63    }
64
65    async fn launch_node<T, CB, AO>(
66        self,
67        target: NodeBuilderWithComponents<T, CB, AO>,
68    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
69    where
70        T: FullNodeTypes<
71            Types: NodeTypesForProvider,
72            Provider = BlockchainProvider<
73                NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
74            >,
75        >,
76        CB: NodeComponentsBuilder<T>,
77        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
78            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
79    {
80        let Self { ctx, engine_tree_config } = self;
81        let NodeBuilderWithComponents {
82            adapter: NodeTypesAdapter { database },
83            components_builder,
84            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
85            config,
86        } = target;
87        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
88
89        // setup the launch context
90        let ctx = ctx
91            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
92            // load the toml config
93            .with_loaded_toml_config(config)?
94            // add resolved peers
95            .with_resolved_peers()?
96            // attach the database
97            .attach(database.clone())
98            // ensure certain settings take effect
99            .with_adjusted_configs()
100            // Create the provider factory
101            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
102            .inspect(|ctx| {
103                info!(target: "reth::cli", "Database opened");
104                match ctx.provider_factory().storage_settings() {
105                    Ok(settings) => {
106                        info!(
107                            target: "reth::cli",
108                            ?settings,
109                            "Storage settings"
110                        );
111                    },
112                    Err(err) => {
113                        warn!(
114                            target: "reth::cli",
115                            ?err,
116                            "Failed to get storage settings"
117                        );
118                    },
119                }
120            })
121            .with_prometheus_server().await?
122            .inspect(|this| {
123                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
124            })
125            .with_genesis()?
126            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
127                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
128            })
129            .with_metrics_task()
130            // passing FullNodeTypes as type parameter here so that we can build
131            // later the components.
132            .with_blockchain_db::<T, _>(move |provider_factory| {
133                Ok(BlockchainProvider::new(provider_factory)?)
134            })?
135            .with_components(components_builder, on_component_initialized).await?;
136
137        // spawn exexs if any
138        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
139
140        // create pipeline
141        let network_handle = ctx.components().network().clone();
142        let network_client = network_handle.fetch_client().await?;
143        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
144
145        let node_config = ctx.node_config();
146
147        // We always assume that node is syncing after a restart
148        network_handle.update_sync_state(SyncState::Syncing);
149
150        let max_block = ctx.max_block(network_client.clone()).await?;
151
152        let static_file_producer = ctx.static_file_producer();
153        let static_file_producer_events = static_file_producer.lock().events();
154        info!(target: "reth::cli", "StaticFileProducer initialized");
155
156        let consensus = Arc::new(ctx.components().consensus().clone());
157
158        let pipeline = build_networked_pipeline(
159            &ctx.toml_config().stages,
160            network_client.clone(),
161            consensus.clone(),
162            ctx.provider_factory().clone(),
163            ctx.task_executor(),
164            ctx.sync_metrics_tx(),
165            ctx.prune_config(),
166            max_block,
167            static_file_producer,
168            ctx.components().evm_config().clone(),
169            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
170            ctx.era_import_source(),
171        )?;
172
173        // The new engine writes directly to static files. This ensures that they're up to the tip.
174        pipeline.move_to_static_files()?;
175
176        let pipeline_events = pipeline.events();
177
178        let mut pruner_builder = ctx.pruner_builder();
179        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
180            pruner_builder =
181                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
182        }
183        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
184        let pruner_events = pruner.events();
185        info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
186
187        let event_sender = EventSender::default();
188
189        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
190
191        // extract the jwt secret from the args if possible
192        let jwt_secret = ctx.auth_jwt_secret()?;
193
194        let add_ons_ctx = AddOnsContext {
195            node: ctx.node_adapter().clone(),
196            config: ctx.node_config(),
197            beacon_engine_handle: beacon_engine_handle.clone(),
198            jwt_secret,
199            engine_events: event_sender.clone(),
200        };
201        let validator_builder = add_ons.engine_validator_builder();
202
203        // Build the engine validator with all required components
204        let engine_validator = validator_builder
205            .clone()
206            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
207            .await?;
208
209        // Create the consensus engine stream with optional reorg
210        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
211            .maybe_skip_fcu(node_config.debug.skip_fcu)
212            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
213            .maybe_reorg(
214                ctx.blockchain_db().clone(),
215                ctx.components().evm_config().clone(),
216                || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
217                node_config.debug.reorg_frequency,
218                node_config.debug.reorg_depth,
219            )
220            .await?
221            // Store messages _after_ skipping so that `replay-engine` command
222            // would replay only the messages that were observed by the engine
223            // during this run.
224            .maybe_store_messages(node_config.debug.engine_api_store.clone());
225
226        let mut engine_service = EngineService::new(
227            consensus.clone(),
228            ctx.chain_spec(),
229            network_client.clone(),
230            Box::pin(consensus_engine_stream),
231            pipeline,
232            Box::new(ctx.task_executor().clone()),
233            ctx.provider_factory().clone(),
234            ctx.blockchain_db().clone(),
235            pruner,
236            ctx.components().payload_builder_handle().clone(),
237            engine_validator,
238            engine_tree_config,
239            ctx.sync_metrics_tx(),
240            ctx.components().evm_config().clone(),
241        );
242
243        info!(target: "reth::cli", "Consensus engine initialized");
244
245        #[allow(clippy::needless_continue)]
246        let events = stream_select!(
247            event_sender.new_listener().map(Into::into),
248            pipeline_events.map(Into::into),
249            ctx.consensus_layer_events(),
250            pruner_events.map(Into::into),
251            static_file_producer_events.map(Into::into),
252        );
253
254        ctx.task_executor().spawn_critical(
255            "events task",
256            Box::pin(node::handle_events(
257                Some(Box::new(ctx.components().network().clone())),
258                Some(ctx.head().number),
259                events,
260            )),
261        );
262
263        let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
264            add_ons.launch_add_ons(add_ons_ctx).await?;
265
266        // Run consensus engine to completion
267        let initial_target = ctx.initial_backfill_target()?;
268        let mut built_payloads = ctx
269            .components()
270            .payload_builder_handle()
271            .subscribe()
272            .await
273            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
274            .into_built_payload_stream()
275            .fuse();
276
277        let chainspec = ctx.chain_spec();
278        let provider = ctx.blockchain_db().clone();
279        let (exit, rx) = oneshot::channel();
280        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
281        let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
282
283        info!(target: "reth::cli", "Starting consensus engine");
284        ctx.task_executor().spawn_critical("consensus engine", Box::pin(async move {
285            if let Some(initial_target) = initial_target {
286                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
287                // network_handle's sync state is already initialized at Syncing
288                engine_service.orchestrator_mut().start_backfill_sync(initial_target);
289            } else if startup_sync_state_idle {
290                network_handle.update_sync_state(SyncState::Idle);
291            }
292
293            let mut res = Ok(());
294
295            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
296            loop {
297                tokio::select! {
298                    payload = built_payloads.select_next_some() => {
299                        if let Some(executed_block) = payload.executed_block() {
300                            debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(),  "inserting built payload");
301                            engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
302                        }
303                    }
304                    event = engine_service.next() => {
305                        let Some(event) = event else { break };
306                        debug!(target: "reth::cli", "Event: {event}");
307                        match event {
308                            ChainEvent::BackfillSyncFinished => {
309                                if terminate_after_backfill {
310                                    debug!(target: "reth::cli", "Terminating after initial backfill");
311                                    break
312                                }
313                                if startup_sync_state_idle {
314                                    network_handle.update_sync_state(SyncState::Idle);
315                                }
316                            }
317                            ChainEvent::BackfillSyncStarted => {
318                                network_handle.update_sync_state(SyncState::Syncing);
319                            }
320                            ChainEvent::FatalError => {
321                                error!(target: "reth::cli", "Fatal error in consensus engine");
322                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
323                                break
324                            }
325                            ChainEvent::Handler(ev) => {
326                                if let Some(head) = ev.canonical_header() {
327                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
328                                    network_handle.update_sync_state(SyncState::Idle);
329                                                                        let head_block = Head {
330                                        number: head.number(),
331                                        hash: head.hash(),
332                                        difficulty: head.difficulty(),
333                                        timestamp: head.timestamp(),
334                                        total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
335                                    };
336                                    network_handle.update_status(head_block);
337
338                                    let updated = BlockRangeUpdate {
339                                        earliest: provider.earliest_block_number().unwrap_or_default(),
340                                        latest:head.number(),
341                                        latest_hash:head.hash()
342                                    };
343                                    network_handle.update_block_range(updated);
344                                }
345                                event_sender.notify(ev);
346                            }
347                        }
348                    }
349                }
350            }
351
352            let _ = exit.send(res);
353        }));
354
355        let full_node = FullNode {
356            evm_config: ctx.components().evm_config().clone(),
357            pool: ctx.components().pool().clone(),
358            network: ctx.components().network().clone(),
359            provider: ctx.node_adapter().provider.clone(),
360            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
361            task_executor: ctx.task_executor().clone(),
362            config: ctx.node_config().clone(),
363            data_dir: ctx.data_dir().clone(),
364            add_ons_handle: RpcHandle {
365                rpc_server_handles,
366                rpc_registry,
367                engine_events,
368                beacon_engine_handle,
369            },
370        };
371        // Notify on node started
372        on_node_started.on_event(FullNode::clone(&full_node))?;
373
374        ctx.spawn_ethstats().await?;
375
376        let handle = NodeHandle {
377            node_exit_future: NodeExitFuture::new(
378                async { rx.await? },
379                full_node.config.debug.terminate,
380            ),
381            node: full_node,
382        };
383
384        Ok(handle)
385    }
386}
387
388impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
389where
390    T: FullNodeTypes<
391        Types: NodeTypesForProvider,
392        Provider = BlockchainProvider<
393            NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
394        >,
395    >,
396    CB: NodeComponentsBuilder<T> + 'static,
397    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
398        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
399        + 'static,
400{
401    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
402    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
403
404    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
405        Box::pin(self.launch_node(target))
406    }
407}