reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use alloy_consensus::BlockHeader;
4use futures::{future::Either, stream, stream_select, StreamExt};
5use reth_chainspec::{EthChainSpec, EthereumHardforks};
6use reth_db_api::{database_metrics::DatabaseMetrics, Database};
7use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
8use reth_engine_service::service::{ChainEvent, EngineService};
9use reth_engine_tree::{
10    engine::{EngineApiRequest, EngineRequestHandler},
11    tree::TreeConfig,
12};
13use reth_engine_util::EngineMessageStreamExt;
14use reth_exex::ExExManagerHandle;
15use reth_network::{NetworkSyncUpdater, SyncState};
16use reth_network_api::BlockDownloaderProvider;
17use reth_node_api::{
18    BeaconConsensusEngineHandle, BuiltPayload, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
19    PayloadAttributesBuilder, PayloadTypes,
20};
21use reth_node_core::{
22    dirs::{ChainPath, DataDirPath},
23    exit::NodeExitFuture,
24    primitives::Head,
25};
26use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
27use reth_provider::providers::{BlockchainProvider, NodeTypesForProvider};
28use reth_tasks::TaskExecutor;
29use reth_tokio_util::EventSender;
30use reth_tracing::tracing::{debug, error, info};
31use std::sync::Arc;
32use tokio::sync::{mpsc::unbounded_channel, oneshot};
33use tokio_stream::wrappers::UnboundedReceiverStream;
34
35use crate::{
36    common::{Attached, LaunchContextWith, WithConfigs},
37    hooks::NodeHooks,
38    rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
39    setup::build_networked_pipeline,
40    AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
41    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
42};
43
44/// The engine node launcher.
45#[derive(Debug)]
46pub struct EngineNodeLauncher {
47    /// The task executor for the node.
48    pub ctx: LaunchContext,
49
50    /// Temporary configuration for engine tree.
51    /// After engine is stabilized, this should be configured through node builder.
52    pub engine_tree_config: TreeConfig,
53}
54
55impl EngineNodeLauncher {
56    /// Create a new instance of the ethereum node launcher.
57    pub const fn new(
58        task_executor: TaskExecutor,
59        data_dir: ChainPath<DataDirPath>,
60        engine_tree_config: TreeConfig,
61    ) -> Self {
62        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
63    }
64}
65
66impl<Types, DB, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
67where
68    Types: NodeTypesForProvider + NodeTypes,
69    DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
70    T: FullNodeTypes<
71        Types = Types,
72        DB = DB,
73        Provider = BlockchainProvider<NodeTypesWithDBAdapter<Types, DB>>,
74    >,
75    CB: NodeComponentsBuilder<T>,
76    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
77        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
78    LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
79        <<Types as NodeTypes>::Payload as PayloadTypes>::PayloadAttributes,
80    >,
81{
82    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
83
84    async fn launch_node(
85        self,
86        target: NodeBuilderWithComponents<T, CB, AO>,
87    ) -> eyre::Result<Self::Node> {
88        let Self { ctx, engine_tree_config } = self;
89        let NodeBuilderWithComponents {
90            adapter: NodeTypesAdapter { database },
91            components_builder,
92            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
93            config,
94        } = target;
95        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
96
97        // setup the launch context
98        let ctx = ctx
99            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
100            // load the toml config
101            .with_loaded_toml_config(config)?
102            // add resolved peers
103            .with_resolved_peers()?
104            // attach the database
105            .attach(database.clone())
106            // ensure certain settings take effect
107            .with_adjusted_configs()
108            // Create the provider factory
109            .with_provider_factory().await?
110            .inspect(|_| {
111                info!(target: "reth::cli", "Database opened");
112            })
113            .with_prometheus_server().await?
114            .inspect(|this| {
115                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
116            })
117            .with_genesis()?
118            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
119                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
120            })
121            .with_metrics_task()
122            // passing FullNodeTypes as type parameter here so that we can build
123            // later the components.
124            .with_blockchain_db::<T, _>(move |provider_factory| {
125                Ok(BlockchainProvider::new(provider_factory)?)
126            })?
127            .with_components(components_builder, on_component_initialized).await?;
128
129        // spawn exexs
130        let exex_manager_handle = ExExLauncher::new(
131            ctx.head(),
132            ctx.node_adapter().clone(),
133            installed_exex,
134            ctx.configs().clone(),
135        )
136        .launch()
137        .await?;
138
139        // create pipeline
140        let network_client = ctx.components().network().fetch_client().await?;
141        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
142
143        let node_config = ctx.node_config();
144
145        let max_block = ctx.max_block(network_client.clone()).await?;
146
147        let static_file_producer = ctx.static_file_producer();
148        let static_file_producer_events = static_file_producer.lock().events();
149        info!(target: "reth::cli", "StaticFileProducer initialized");
150
151        let consensus = Arc::new(ctx.components().consensus().clone());
152
153        // Configure the pipeline
154        let pipeline_exex_handle =
155            exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
156        let pipeline = build_networked_pipeline(
157            &ctx.toml_config().stages,
158            network_client.clone(),
159            consensus.clone(),
160            ctx.provider_factory().clone(),
161            ctx.task_executor(),
162            ctx.sync_metrics_tx(),
163            ctx.prune_config(),
164            max_block,
165            static_file_producer,
166            ctx.components().block_executor().clone(),
167            pipeline_exex_handle,
168        )?;
169
170        // The new engine writes directly to static files. This ensures that they're up to the tip.
171        pipeline.move_to_static_files()?;
172
173        let pipeline_events = pipeline.events();
174
175        let mut pruner_builder = ctx.pruner_builder();
176        if let Some(exex_manager_handle) = &exex_manager_handle {
177            pruner_builder =
178                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
179        }
180        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
181        let pruner_events = pruner.events();
182        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
183
184        let event_sender = EventSender::default();
185
186        let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
187
188        // extract the jwt secret from the args if possible
189        let jwt_secret = ctx.auth_jwt_secret()?;
190
191        let add_ons_ctx = AddOnsContext {
192            node: ctx.node_adapter().clone(),
193            config: ctx.node_config(),
194            beacon_engine_handle: beacon_engine_handle.clone(),
195            jwt_secret,
196            engine_events: event_sender.clone(),
197        };
198        let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
199
200        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
201            .maybe_skip_fcu(node_config.debug.skip_fcu)
202            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
203            .maybe_reorg(
204                ctx.blockchain_db().clone(),
205                ctx.components().evm_config().clone(),
206                engine_payload_validator.clone(),
207                node_config.debug.reorg_frequency,
208                node_config.debug.reorg_depth,
209            )
210            // Store messages _after_ skipping so that `replay-engine` command
211            // would replay only the messages that were observed by the engine
212            // during this run.
213            .maybe_store_messages(node_config.debug.engine_api_store.clone());
214
215        let mut engine_service = if ctx.is_dev() {
216            let eth_service = LocalEngineService::new(
217                consensus.clone(),
218                ctx.components().block_executor().clone(),
219                ctx.provider_factory().clone(),
220                ctx.blockchain_db().clone(),
221                pruner,
222                ctx.components().payload_builder_handle().clone(),
223                engine_payload_validator,
224                engine_tree_config,
225                ctx.invalid_block_hook()?,
226                ctx.sync_metrics_tx(),
227                consensus_engine_tx.clone(),
228                Box::pin(consensus_engine_stream),
229                ctx.dev_mining_mode(ctx.components().pool()),
230                LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
231                ctx.components().evm_config().clone(),
232            );
233
234            Either::Left(eth_service)
235        } else {
236            let eth_service = EngineService::new(
237                consensus.clone(),
238                ctx.components().block_executor().clone(),
239                ctx.chain_spec(),
240                network_client.clone(),
241                Box::pin(consensus_engine_stream),
242                pipeline,
243                Box::new(ctx.task_executor().clone()),
244                ctx.provider_factory().clone(),
245                ctx.blockchain_db().clone(),
246                pruner,
247                ctx.components().payload_builder_handle().clone(),
248                engine_payload_validator,
249                engine_tree_config,
250                ctx.invalid_block_hook()?,
251                ctx.sync_metrics_tx(),
252                ctx.components().evm_config().clone(),
253            );
254
255            Either::Right(eth_service)
256        };
257
258        info!(target: "reth::cli", "Consensus engine initialized");
259
260        let events = stream_select!(
261            event_sender.new_listener().map(Into::into),
262            pipeline_events.map(Into::into),
263            if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
264                Either::Left(
265                    ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
266                        .map(Into::into),
267                )
268            } else {
269                Either::Right(stream::empty())
270            },
271            pruner_events.map(Into::into),
272            static_file_producer_events.map(Into::into),
273        );
274
275        ctx.task_executor().spawn_critical(
276            "events task",
277            node::handle_events(
278                Some(Box::new(ctx.components().network().clone())),
279                Some(ctx.head().number),
280                events,
281            ),
282        );
283
284        let RpcHandle { rpc_server_handles, rpc_registry, engine_events, beacon_engine_handle } =
285            add_ons.launch_add_ons(add_ons_ctx).await?;
286
287        // Run consensus engine to completion
288        let initial_target = ctx.initial_backfill_target()?;
289        let network_handle = ctx.components().network().clone();
290        let mut built_payloads = ctx
291            .components()
292            .payload_builder_handle()
293            .subscribe()
294            .await
295            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
296            .into_built_payload_stream()
297            .fuse();
298        let chainspec = ctx.chain_spec();
299        let (exit, rx) = oneshot::channel();
300        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
301
302        info!(target: "reth::cli", "Starting consensus engine");
303        ctx.task_executor().spawn_critical("consensus engine", async move {
304            if let Some(initial_target) = initial_target {
305                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
306                if let Either::Right(eth_service) = &mut engine_service {
307                    eth_service.orchestrator_mut().start_backfill_sync(initial_target);
308                }
309            }
310
311            let mut res = Ok(());
312
313            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
314            loop {
315                tokio::select! {
316                    payload = built_payloads.select_next_some() => {
317                        if let Some(executed_block) = payload.executed_block() {
318                            debug!(target: "reth::cli", block=?executed_block.recovered_block().num_hash(),  "inserting built payload");
319                            if let Either::Right(eth_service) = &mut engine_service {
320                                eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
321                            }
322                        }
323                    }
324                    event = engine_service.next() => {
325                        let Some(event) = event else { break };
326                        debug!(target: "reth::cli", "Event: {event}");
327                        match event {
328                            ChainEvent::BackfillSyncFinished => {
329                                if terminate_after_backfill {
330                                    debug!(target: "reth::cli", "Terminating after initial backfill");
331                                    break
332                                }
333
334                                network_handle.update_sync_state(SyncState::Idle);
335                            }
336                            ChainEvent::BackfillSyncStarted => {
337                                network_handle.update_sync_state(SyncState::Syncing);
338                            }
339                            ChainEvent::FatalError => {
340                                error!(target: "reth::cli", "Fatal error in consensus engine");
341                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
342                                break
343                            }
344                            ChainEvent::Handler(ev) => {
345                                if let Some(head) = ev.canonical_header() {
346                                    let head_block = Head {
347                                        number: head.number(),
348                                        hash: head.hash(),
349                                        difficulty: head.difficulty(),
350                                        timestamp: head.timestamp(),
351                                        total_difficulty: chainspec.final_paris_total_difficulty().filter(|_| chainspec.is_paris_active_at_block(head.number())).unwrap_or_default(),
352                                    };
353                                    network_handle.update_status(head_block);
354                                }
355                                event_sender.notify(ev);
356                            }
357                        }
358                    }
359                }
360            }
361
362            let _ = exit.send(res);
363        });
364
365        let full_node = FullNode {
366            evm_config: ctx.components().evm_config().clone(),
367            block_executor: ctx.components().block_executor().clone(),
368            pool: ctx.components().pool().clone(),
369            network: ctx.components().network().clone(),
370            provider: ctx.node_adapter().provider.clone(),
371            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
372            task_executor: ctx.task_executor().clone(),
373            config: ctx.node_config().clone(),
374            data_dir: ctx.data_dir().clone(),
375            add_ons_handle: RpcHandle {
376                rpc_server_handles,
377                rpc_registry,
378                engine_events,
379                beacon_engine_handle,
380            },
381        };
382        // Notify on node started
383        on_node_started.on_event(FullNode::clone(&full_node))?;
384
385        let handle = NodeHandle {
386            node_exit_future: NodeExitFuture::new(
387                async { rx.await? },
388                full_node.config.debug.terminate,
389            ),
390            node: full_node,
391        };
392
393        Ok(handle)
394    }
395}