reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use crate::{
4    common::{Attached, LaunchContextWith, WithConfigs},
5    hooks::NodeHooks,
6    rpc::{EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
7    setup::build_networked_pipeline,
8    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
9    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
10};
11use alloy_consensus::BlockHeader;
12use futures::{stream_select, StreamExt};
13use reth_chainspec::{EthChainSpec, EthereumHardforks};
14use reth_engine_service::service::{ChainEvent, EngineService};
15use reth_engine_tree::{
16    engine::{EngineApiRequest, EngineRequestHandler},
17    tree::TreeConfig,
18};
19use reth_engine_util::EngineMessageStreamExt;
20use reth_exex::ExExManagerHandle;
21use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
22use reth_network_api::BlockDownloaderProvider;
23use reth_node_api::{
24    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
25};
26use reth_node_core::{
27    dirs::{ChainPath, DataDirPath},
28    exit::NodeExitFuture,
29    primitives::Head,
30};
31use reth_node_events::node;
32use reth_provider::{
33    providers::{BlockchainProvider, NodeTypesForProvider},
34    BlockNumReader,
35};
36use reth_tasks::TaskExecutor;
37use reth_tokio_util::EventSender;
38use reth_tracing::tracing::{debug, error, info};
39use std::{future::Future, pin::Pin, sync::Arc};
40use tokio::sync::{mpsc::unbounded_channel, oneshot};
41use tokio_stream::wrappers::UnboundedReceiverStream;
42
43/// The engine node launcher.
44#[derive(Debug)]
45pub struct EngineNodeLauncher {
46    /// The task executor for the node.
47    pub ctx: LaunchContext,
48
49    /// Temporary configuration for engine tree.
50    /// After engine is stabilized, this should be configured through node builder.
51    pub engine_tree_config: TreeConfig,
52}
53
54impl EngineNodeLauncher {
55    /// Create a new instance of the ethereum node launcher.
56    pub const fn new(
57        task_executor: TaskExecutor,
58        data_dir: ChainPath<DataDirPath>,
59        engine_tree_config: TreeConfig,
60    ) -> Self {
61        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
62    }
63
64    async fn launch_node<T, CB, AO>(
65        self,
66        target: NodeBuilderWithComponents<T, CB, AO>,
67    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
68    where
69        T: FullNodeTypes<
70            Types: NodeTypesForProvider,
71            Provider = BlockchainProvider<
72                NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
73            >,
74        >,
75        CB: NodeComponentsBuilder<T>,
76        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
77            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
78    {
79        let Self { ctx, engine_tree_config } = self;
80        let NodeBuilderWithComponents {
81            adapter: NodeTypesAdapter { database },
82            components_builder,
83            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
84            config,
85        } = target;
86        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
87
88        // setup the launch context
89        let ctx = ctx
90            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
91            // load the toml config
92            .with_loaded_toml_config(config)?
93            // add resolved peers
94            .with_resolved_peers()?
95            // attach the database
96            .attach(database.clone())
97            // ensure certain settings take effect
98            .with_adjusted_configs()
99            // Create the provider factory
100            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>().await?
101            .inspect(|_| {
102                info!(target: "reth::cli", "Database opened");
103            })
104            .with_prometheus_server().await?
105            .inspect(|this| {
106                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
107            })
108            .with_genesis()?
109            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>>| {
110                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
111            })
112            .with_metrics_task()
113            // passing FullNodeTypes as type parameter here so that we can build
114            // later the components.
115            .with_blockchain_db::<T, _>(move |provider_factory| {
116                Ok(BlockchainProvider::new(provider_factory)?)
117            })?
118            .with_components(components_builder, on_component_initialized).await?;
119
120        // Try to expire pre-merge transaction history if configured
121        ctx.expire_pre_merge_transactions()?;
122
123        // spawn exexs if any
124        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
125
126        // create pipeline
127        let network_handle = ctx.components().network().clone();
128        let network_client = network_handle.fetch_client().await?;
129        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
130
131        let node_config = ctx.node_config();
132
133        // We always assume that node is syncing after a restart
134        network_handle.update_sync_state(SyncState::Syncing);
135
136        let max_block = ctx.max_block(network_client.clone()).await?;
137
138        let static_file_producer = ctx.static_file_producer();
139        let static_file_producer_events = static_file_producer.lock().events();
140        info!(target: "reth::cli", "StaticFileProducer initialized");
141
142        let consensus = Arc::new(ctx.components().consensus().clone());
143
144        let pipeline = build_networked_pipeline(
145            &ctx.toml_config().stages,
146            network_client.clone(),
147            consensus.clone(),
148            ctx.provider_factory().clone(),
149            ctx.task_executor(),
150            ctx.sync_metrics_tx(),
151            ctx.prune_config(),
152            max_block,
153            static_file_producer,
154            ctx.components().evm_config().clone(),
155            maybe_exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty),
156            ctx.era_import_source(),
157        )?;
158
159        // The new engine writes directly to static files. This ensures that they're up to the tip.
160        pipeline.move_to_static_files()?;
161
162        let pipeline_events = pipeline.events();
163
164        let mut pruner_builder = ctx.pruner_builder();
165        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
166            pruner_builder =
167                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
168        }
169        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
170        let pruner_events = pruner.events();
171        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
172
173        let event_sender = EventSender::default();
174
175        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
176
177        // extract the jwt secret from the args if possible
178        let jwt_secret = ctx.auth_jwt_secret()?;
179
180        let add_ons_ctx = AddOnsContext {
181            node: ctx.node_adapter().clone(),
182            config: ctx.node_config(),
183            beacon_engine_handle: beacon_engine_handle.clone(),
184            jwt_secret,
185            engine_events: event_sender.clone(),
186        };
187        let validator_builder = add_ons.engine_validator_builder();
188
189        // Build the engine validator with all required components
190        let engine_validator = validator_builder
191            .clone()
192            .build_tree_validator(&add_ons_ctx, engine_tree_config.clone())
193            .await?;
194
195        // Create the consensus engine stream with optional reorg
196        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
197            .maybe_skip_fcu(node_config.debug.skip_fcu)
198            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
199            .maybe_reorg(
200                ctx.blockchain_db().clone(),
201                ctx.components().evm_config().clone(),
202                || validator_builder.build_tree_validator(&add_ons_ctx, engine_tree_config.clone()),
203                node_config.debug.reorg_frequency,
204                node_config.debug.reorg_depth,
205            )
206            .await?
207            // Store messages _after_ skipping so that `replay-engine` command
208            // would replay only the messages that were observed by the engine
209            // during this run.
210            .maybe_store_messages(node_config.debug.engine_api_store.clone());
211
212        let mut engine_service = EngineService::new(
213            consensus.clone(),
214            ctx.chain_spec(),
215            network_client.clone(),
216            Box::pin(consensus_engine_stream),
217            pipeline,
218            Box::new(ctx.task_executor().clone()),
219            ctx.provider_factory().clone(),
220            ctx.blockchain_db().clone(),
221            pruner,
222            ctx.components().payload_builder_handle().clone(),
223            engine_validator,
224            engine_tree_config,
225            ctx.sync_metrics_tx(),
226            ctx.components().evm_config().clone(),
227        );
228
229        info!(target: "reth::cli", "Consensus engine initialized");
230
231        let events = stream_select!(
232            event_sender.new_listener().map(Into::into),
233            pipeline_events.map(Into::into),
234            ctx.consensus_layer_events(),
235            pruner_events.map(Into::into),
236            static_file_producer_events.map(Into::into),
237        );
238
239        ctx.task_executor().spawn_critical(
240            "events task",
241            Box::pin(node::handle_events(
242                Some(Box::new(ctx.components().network().clone())),
243                Some(ctx.head().number),
244                events,
245            )),
246        );
247
248        let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
249            add_ons.launch_add_ons(add_ons_ctx).await?;
250
251        // Run consensus engine to completion
252        let initial_target = ctx.initial_backfill_target()?;
253        let mut built_payloads = ctx
254            .components()
255            .payload_builder_handle()
256            .subscribe()
257            .await
258            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
259            .into_built_payload_stream()
260            .fuse();
261
262        let chainspec = ctx.chain_spec();
263        let provider = ctx.blockchain_db().clone();
264        let (exit, rx) = oneshot::channel();
265        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
266
267        info!(target: "reth::cli", "Starting consensus engine");
268        ctx.task_executor().spawn_critical("consensus engine", Box::pin(async move {
269            if let Some(initial_target) = initial_target {
270                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
271                engine_service.orchestrator_mut().start_backfill_sync(initial_target);
272            }
273
274            let mut res = Ok(());
275
276            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
277            loop {
278                tokio::select! {
279                    payload = built_payloads.select_next_some() => {
280                        if let Some(executed_block) = payload.executed_block() {
281                            debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(),  "inserting built payload");
282                            engine_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
283                        }
284                    }
285                    event = engine_service.next() => {
286                        let Some(event) = event else { break };
287                        debug!(target: "reth::cli", "Event: {event}");
288                        match event {
289                            ChainEvent::BackfillSyncFinished => {
290                                if terminate_after_backfill {
291                                    debug!(target: "reth::cli", "Terminating after initial backfill");
292                                    break
293                                }
294                            }
295                            ChainEvent::BackfillSyncStarted => {
296                                network_handle.update_sync_state(SyncState::Syncing);
297                            }
298                            ChainEvent::FatalError => {
299                                error!(target: "reth::cli", "Fatal error in consensus engine");
300                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
301                                break
302                            }
303                            ChainEvent::Handler(ev) => {
304                                if let Some(head) = ev.canonical_header() {
305                                    // Once we're progressing via live sync, we can consider the node is not syncing anymore
306                                    network_handle.update_sync_state(SyncState::Idle);
307                                                                        let head_block = Head {
308                                        number: head.number(),
309                                        hash: head.hash(),
310                                        difficulty: head.difficulty(),
311                                        timestamp: head.timestamp(),
312                                        total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
313                                    };
314                                    network_handle.update_status(head_block);
315
316                                    let updated = BlockRangeUpdate {
317                                        earliest: provider.earliest_block_number().unwrap_or_default(),
318                                        latest:head.number(),
319                                        latest_hash:head.hash()
320                                    };
321                                    network_handle.update_block_range(updated);
322                                }
323                                event_sender.notify(ev);
324                            }
325                        }
326                    }
327                }
328            }
329
330            let _ = exit.send(res);
331        }));
332
333        let full_node = FullNode {
334            evm_config: ctx.components().evm_config().clone(),
335            pool: ctx.components().pool().clone(),
336            network: ctx.components().network().clone(),
337            provider: ctx.node_adapter().provider.clone(),
338            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
339            task_executor: ctx.task_executor().clone(),
340            config: ctx.node_config().clone(),
341            data_dir: ctx.data_dir().clone(),
342            add_ons_handle: RpcHandle {
343                rpc_server_handles,
344                rpc_registry,
345                engine_events,
346                beacon_engine_handle,
347            },
348        };
349        // Notify on node started
350        on_node_started.on_event(FullNode::clone(&full_node))?;
351
352        ctx.spawn_ethstats().await?;
353
354        let handle = NodeHandle {
355            node_exit_future: NodeExitFuture::new(
356                async { rx.await? },
357                full_node.config.debug.terminate,
358            ),
359            node: full_node,
360        };
361
362        Ok(handle)
363    }
364}
365
366impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
367where
368    T: FullNodeTypes<
369        Types: NodeTypesForProvider,
370        Provider = BlockchainProvider<
371            NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
372        >,
373    >,
374    CB: NodeComponentsBuilder<T> + 'static,
375    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
376        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
377        + 'static,
378{
379    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
380    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
381
382    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
383        Box::pin(self.launch_node(target))
384    }
385}