reth_node_builder/launch/
exex.rs

1//! Support for launching execution extensions.
2
3use alloy_eips::BlockNumHash;
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8    ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9    DEFAULT_EXEX_MANAGER_CAPACITY,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_primitives::Head;
13use reth_provider::CanonStateSubscriptions;
14use reth_tracing::tracing::{debug, info};
15use std::{fmt, fmt::Debug};
16use tracing::Instrument;
17
18use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
19
20/// Can launch execution extensions.
21pub struct ExExLauncher<Node: FullNodeComponents> {
22    head: Head,
23    extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
24    components: Node,
25    config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
26}
27
28impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
29    /// Create a new `ExExLauncher` with the given extensions.
30    pub const fn new(
31        head: Head,
32        components: Node,
33        extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
34        config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
35    ) -> Self {
36        Self { head, extensions, components, config_container }
37    }
38
39    /// Launches all execution extensions.
40    ///
41    /// Spawns all extensions and returns the handle to the exex manager if any extensions are
42    /// installed.
43    pub async fn launch(
44        self,
45    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
46        let Self { head, extensions, components, config_container } = self;
47        let head = BlockNumHash::new(head.number, head.hash);
48
49        if extensions.is_empty() {
50            // nothing to launch
51            return Ok(None)
52        }
53
54        info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
55        let exex_wal = Wal::new(
56            config_container
57                .config
58                .datadir
59                .clone()
60                .resolve_datadir(config_container.config.chain.chain())
61                .exex_wal(),
62        )?;
63
64        let mut exex_handles = Vec::with_capacity(extensions.len());
65        let mut exexes = Vec::with_capacity(extensions.len());
66
67        for (id, exex) in extensions {
68            // create a new exex handle
69            let (handle, events, notifications) = ExExHandle::new(
70                id.clone(),
71                head,
72                components.provider().clone(),
73                components.block_executor().clone(),
74                exex_wal.handle(),
75            );
76            exex_handles.push(handle);
77
78            // create the launch context for the exex
79            let context = ExExContext {
80                head,
81                config: config_container.config.clone(),
82                reth_config: config_container.toml_config.clone(),
83                components: components.clone(),
84                events,
85                notifications,
86            };
87
88            let executor = components.task_executor().clone();
89            exexes.push(async move {
90                debug!(target: "reth::cli", id, "spawning exex");
91                let span = reth_tracing::tracing::info_span!("exex", id);
92
93                // init the exex
94                let exex = exex.launch(context).instrument(span.clone()).await.unwrap();
95
96                // spawn it as a crit task
97                executor.spawn_critical(
98                    "exex",
99                    async move {
100                        info!(target: "reth::cli", "ExEx started");
101                        match exex.await {
102                            Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
103                            Err(err) => panic!("ExEx {id} crashed: {err}"),
104                        }
105                    }
106                    .instrument(span),
107                );
108            });
109        }
110
111        future::join_all(exexes).await;
112
113        // spawn exex manager
114        debug!(target: "reth::cli", "spawning exex manager");
115        let exex_manager = ExExManager::new(
116            components.provider().clone(),
117            exex_handles,
118            DEFAULT_EXEX_MANAGER_CAPACITY,
119            exex_wal,
120            components.provider().finalized_block_stream(),
121        );
122        let exex_manager_handle = exex_manager.handle();
123        components.task_executor().spawn_critical("exex manager", async move {
124            exex_manager.await.expect("exex manager crashed");
125        });
126
127        // send notifications from the blockchain tree to exex manager
128        let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
129        let mut handle = exex_manager_handle.clone();
130        components.task_executor().spawn_critical(
131            "exex manager blockchain tree notifications",
132            async move {
133                while let Ok(notification) = canon_state_notifications.recv().await {
134                    handle
135                        .send_async(ExExNotificationSource::BlockchainTree, notification.into())
136                        .await
137                        .expect("blockchain tree notification could not be sent to exex manager");
138                }
139            },
140        );
141
142        info!(target: "reth::cli", "ExEx Manager started");
143
144        Ok(Some(exex_manager_handle))
145    }
146}
147
148impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150        f.debug_struct("ExExLauncher")
151            .field("head", &self.head)
152            .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
153            .field("components", &"...")
154            .field("config_container", &self.config_container)
155            .finish()
156    }
157}