reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use alloy_consensus::BlockHeader;
4use futures::{future::Either, stream, stream_select, StreamExt};
5use reth_chainspec::EthChainSpec;
6use reth_db_api::{database_metrics::DatabaseMetrics, Database};
7use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
8use reth_engine_service::service::{ChainEvent, EngineService};
9use reth_engine_tree::{
10    engine::{EngineApiRequest, EngineRequestHandler},
11    tree::TreeConfig,
12};
13use reth_engine_util::EngineMessageStreamExt;
14use reth_exex::ExExManagerHandle;
15use reth_network::{NetworkSyncUpdater, SyncState};
16use reth_network_api::BlockDownloaderProvider;
17use reth_node_api::{
18    BeaconConsensusEngineHandle, BuiltPayload, FullNodeTypes, NodeTypesWithDBAdapter,
19    NodeTypesWithEngine, PayloadAttributesBuilder, PayloadTypes,
20};
21use reth_node_core::{
22    dirs::{ChainPath, DataDirPath},
23    exit::NodeExitFuture,
24    primitives::Head,
25};
26use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
27use reth_primitives::EthereumHardforks;
28use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
29use reth_tasks::TaskExecutor;
30use reth_tokio_util::EventSender;
31use reth_tracing::tracing::{debug, error, info};
32use std::sync::Arc;
33use tokio::sync::{mpsc::unbounded_channel, oneshot};
34use tokio_stream::wrappers::UnboundedReceiverStream;
35
36use crate::{
37    common::{Attached, LaunchContextWith, WithConfigs},
38    hooks::NodeHooks,
39    rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
40    setup::build_networked_pipeline,
41    AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
42    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
43};
44
45/// The engine node launcher.
46#[derive(Debug)]
47pub struct EngineNodeLauncher {
48    /// The task executor for the node.
49    pub ctx: LaunchContext,
50
51    /// Temporary configuration for engine tree.
52    /// After engine is stabilized, this should be configured through node builder.
53    pub engine_tree_config: TreeConfig,
54}
55
56impl EngineNodeLauncher {
57    /// Create a new instance of the ethereum node launcher.
58    pub const fn new(
59        task_executor: TaskExecutor,
60        data_dir: ChainPath<DataDirPath>,
61        engine_tree_config: TreeConfig,
62    ) -> Self {
63        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
64    }
65}
66
67impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
68where
69    Types: NodeTypesForProvider + NodeTypesWithEngine,
70    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
71    T: FullNodeTypes<
72        Types = Types,
73        DB = DB,
74        Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
75    >,
76    CB: NodeComponentsBuilder<T>,
77    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
78        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
79    LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
80        <<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
81    >,
82{
83    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
84
85    async fn launch_node(
86        self,
87        target: NodeBuilderWithComponents<T, CB, AO>,
88    ) -> eyre::Result<Self::Node> {
89        let Self { ctx, engine_tree_config } = self;
90        let NodeBuilderWithComponents {
91            adapter: NodeTypesAdapter { database },
92            components_builder,
93            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
94            config,
95        } = target;
96        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
97
98        // setup the launch context
99        let ctx = ctx
100            .with_configured_globals()
101            // load the toml config
102            .with_loaded_toml_config(config)?
103            // add resolved peers
104            .with_resolved_peers()?
105            // attach the database
106            .attach(database.clone())
107            // ensure certain settings take effect
108            .with_adjusted_configs()
109            // Create the provider factory
110            .with_provider_factory().await?
111            .inspect(|_| {
112                info!(target: "reth::cli", "Database opened");
113            })
114            .with_prometheus_server().await?
115            .inspect(|this| {
116                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
117            })
118            .with_genesis()?
119            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
120                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
121            })
122            .with_metrics_task()
123            // passing FullNodeTypes as type parameter here so that we can build
124            // later the components.
125            .with_blockchain_db::<T, _>(move |provider_factory| {
126                Ok(BlockchainProvider::new(provider_factory)?)
127            })?
128            .with_components(components_builder, on_component_initialized).await?;
129
130        // spawn exexs
131        let exex_manager_handle = ExExLauncher::new(
132            ctx.head(),
133            ctx.node_adapter().clone(),
134            installed_exex,
135            ctx.configs().clone(),
136        )
137        .launch()
138        .await?;
139
140        // create pipeline
141        let network_client = ctx.components().network().fetch_client().await?;
142        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
143
144        let node_config = ctx.node_config();
145
146        let max_block = ctx.max_block(network_client.clone()).await?;
147
148        let static_file_producer = ctx.static_file_producer();
149        let static_file_producer_events = static_file_producer.lock().events();
150        info!(target: "reth::cli", "StaticFileProducer initialized");
151
152        let consensus = Arc::new(ctx.components().consensus().clone());
153
154        // Configure the pipeline
155        let pipeline_exex_handle =
156            exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
157        let pipeline = build_networked_pipeline(
158            &ctx.toml_config().stages,
159            network_client.clone(),
160            consensus.clone(),
161            ctx.provider_factory().clone(),
162            ctx.task_executor(),
163            ctx.sync_metrics_tx(),
164            ctx.prune_config(),
165            max_block,
166            static_file_producer,
167            ctx.components().block_executor().clone(),
168            pipeline_exex_handle,
169        )?;
170
171        // The new engine writes directly to static files. This ensures that they're up to the tip.
172        pipeline.move_to_static_files()?;
173
174        let pipeline_events = pipeline.events();
175
176        let mut pruner_builder = ctx.pruner_builder();
177        if let Some(exex_manager_handle) = &exex_manager_handle {
178            pruner_builder =
179                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
180        }
181        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
182        let pruner_events = pruner.events();
183        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
184
185        let event_sender = EventSender::default();
186
187        let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
188
189        // extract the jwt secret from the args if possible
190        let jwt_secret = ctx.auth_jwt_secret()?;
191
192        let add_ons_ctx = AddOnsContext {
193            node: ctx.node_adapter().clone(),
194            config: ctx.node_config(),
195            beacon_engine_handle: beacon_engine_handle.clone(),
196            jwt_secret,
197            engine_events: event_sender.clone(),
198        };
199        let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
200
201        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
202            .maybe_skip_fcu(node_config.debug.skip_fcu)
203            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
204            .maybe_reorg(
205                ctx.blockchain_db().clone(),
206                ctx.components().evm_config().clone(),
207                engine_payload_validator.clone(),
208                node_config.debug.reorg_frequency,
209                node_config.debug.reorg_depth,
210            )
211            // Store messages _after_ skipping so that `replay-engine` command
212            // would replay only the messages that were observed by the engine
213            // during this run.
214            .maybe_store_messages(node_config.debug.engine_api_store.clone());
215
216        let mut engine_service = if ctx.is_dev() {
217            let eth_service = LocalEngineService::new(
218                consensus.clone(),
219                ctx.components().block_executor().clone(),
220                ctx.provider_factory().clone(),
221                ctx.blockchain_db().clone(),
222                pruner,
223                ctx.components().payload_builder_handle().clone(),
224                engine_payload_validator,
225                engine_tree_config,
226                ctx.invalid_block_hook()?,
227                ctx.sync_metrics_tx(),
228                consensus_engine_tx.clone(),
229                Box::pin(consensus_engine_stream),
230                ctx.dev_mining_mode(ctx.components().pool()),
231                LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
232                ctx.components().evm_config().clone(),
233            );
234
235            Either::Left(eth_service)
236        } else {
237            let eth_service = EngineService::new(
238                consensus.clone(),
239                ctx.components().block_executor().clone(),
240                ctx.chain_spec(),
241                network_client.clone(),
242                Box::pin(consensus_engine_stream),
243                pipeline,
244                Box::new(ctx.task_executor().clone()),
245                ctx.provider_factory().clone(),
246                ctx.blockchain_db().clone(),
247                pruner,
248                ctx.components().payload_builder_handle().clone(),
249                engine_payload_validator,
250                engine_tree_config,
251                ctx.invalid_block_hook()?,
252                ctx.sync_metrics_tx(),
253                ctx.components().evm_config().clone(),
254            );
255
256            Either::Right(eth_service)
257        };
258
259        info!(target: "reth::cli", "Consensus engine initialized");
260
261        let events = stream_select!(
262            event_sender.new_listener().map(Into::into),
263            pipeline_events.map(Into::into),
264            if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
265                Either::Left(
266                    ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
267                        .map(Into::into),
268                )
269            } else {
270                Either::Right(stream::empty())
271            },
272            pruner_events.map(Into::into),
273            static_file_producer_events.map(Into::into),
274        );
275
276        ctx.task_executor().spawn_critical(
277            "events task",
278            node::handle_events(
279                Some(Box::new(ctx.components().network().clone())),
280                Some(ctx.head().number),
281                events,
282            ),
283        );
284
285        let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
286            add_ons.launch_add_ons(add_ons_ctx).await?;
287
288        // Run consensus engine to completion
289        let initial_target = ctx.initial_backfill_target()?;
290        let network_handle = ctx.components().network().clone();
291        let mut built_payloads = ctx
292            .components()
293            .payload_builder_handle()
294            .subscribe()
295            .await
296            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
297            .into_built_payload_stream()
298            .fuse();
299        let chainspec = ctx.chain_spec();
300        let (exit, rx) = oneshot::channel();
301        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
302
303        info!(target: "reth::cli", "Starting consensus engine");
304        ctx.task_executor().spawn_critical("consensus engine", async move {
305            if let Some(initial_target) = initial_target {
306                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
307                if let Either::Right(eth_service) = &mut engine_service {
308                    eth_service.orchestrator_mut().start_backfill_sync(initial_target);
309                }
310            }
311
312            let mut res = Ok(());
313
314            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
315            loop {
316                tokio::select! {
317                    payload = built_payloads.select_next_some() => {
318                        if let Some(executed_block) = payload.executed_block() {
319                            debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(),  "inserting built payload");
320                            if let Either::Right(eth_service) = &mut engine_service {
321                                eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
322                            }
323                        }
324                    }
325                    event = engine_service.next() => {
326                        let Some(event) = event else { break };
327                        debug!(target: "reth::cli", "Event: {event}");
328                        match event {
329                            ChainEvent::BackfillSyncFinished => {
330                                if terminate_after_backfill {
331                                    debug!(target: "reth::cli", "Terminating after initial backfill");
332                                    break
333                                }
334
335                                network_handle.update_sync_state(SyncState::Idle);
336                            }
337                            ChainEvent::BackfillSyncStarted => {
338                                network_handle.update_sync_state(SyncState::Syncing);
339                            }
340                            ChainEvent::FatalError => {
341                                error!(target: "reth::cli", "Fatal error in consensus engine");
342                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
343                                break
344                            }
345                            ChainEvent::Handler(ev) => {
346                                if let Some(head) = ev.canonical_header() {
347                                    let head_block = Head {
348                                        number: head.number(),
349                                        hash: head.hash(),
350                                        difficulty: head.difficulty(),
351                                        timestamp: head.timestamp(),
352                                        total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
353                                    };
354                                    network_handle.update_status(head_block);
355                                }
356                                event_sender.notify(ev);
357                            }
358                        }
359                    }
360                }
361            }
362
363            let _ = exit.send(res);
364        });
365
366        let full_node = FullNode {
367            evm_config: ctx.components().evm_config().clone(),
368            block_executor: ctx.components().block_executor().clone(),
369            pool: ctx.components().pool().clone(),
370            network: ctx.components().network().clone(),
371            provider: ctx.node_adapter().provider.clone(),
372            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
373            task_executor: ctx.task_executor().clone(),
374            config: ctx.node_config().clone(),
375            data_dir: ctx.data_dir().clone(),
376            add_ons_handle: RpcHandle {
377                rpc_server_handles,
378                rpc_registry,
379                engine_events,
380                beacon_engine_handle,
381            },
382        };
383        // Notify on node started
384        on_node_started.on_event(FullNode::clone(&full_node))?;
385
386        let handle = NodeHandle {
387            node_exit_future: NodeExitFuture::new(
388                async { rx.await? },
389                full_node.config.debug.terminate,
390            ),
391            node: full_node,
392        };
393
394        Ok(handle)
395    }
396}