reth_node_builder/launch/
exex.rs

1//! Support for launching execution extensions.
2
3use alloy_eips::{eip2124::Head, BlockNumHash};
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8    ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9    DEFAULT_EXEX_MANAGER_CAPACITY,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_provider::CanonStateSubscriptions;
13use reth_tracing::tracing::{debug, info};
14use std::{fmt, fmt::Debug};
15use tracing::Instrument;
16
17use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
18
19/// Can launch execution extensions.
20pub struct ExExLauncher<Node: FullNodeComponents> {
21    head: Head,
22    extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
23    components: Node,
24    config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
25}
26
27impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
28    /// Create a new `ExExLauncher` with the given extensions.
29    pub const fn new(
30        head: Head,
31        components: Node,
32        extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
33        config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
34    ) -> Self {
35        Self { head, extensions, components, config_container }
36    }
37
38    /// Launches all execution extensions.
39    ///
40    /// Spawns all extensions and returns the handle to the exex manager if any extensions are
41    /// installed.
42    pub async fn launch(
43        self,
44    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
45        let Self { head, extensions, components, config_container } = self;
46        let head = BlockNumHash::new(head.number, head.hash);
47
48        if extensions.is_empty() {
49            // nothing to launch
50            return Ok(None)
51        }
52
53        info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
54        let exex_wal = Wal::new(
55            config_container
56                .config
57                .datadir
58                .clone()
59                .resolve_datadir(config_container.config.chain.chain())
60                .exex_wal(),
61        )?;
62
63        let mut exex_handles = Vec::with_capacity(extensions.len());
64        let mut exexes = Vec::with_capacity(extensions.len());
65
66        for (id, exex) in extensions {
67            // create a new exex handle
68            let (handle, events, notifications) = ExExHandle::new(
69                id.clone(),
70                head,
71                components.provider().clone(),
72                components.block_executor().clone(),
73                exex_wal.handle(),
74            );
75            exex_handles.push(handle);
76
77            // create the launch context for the exex
78            let context = ExExContext {
79                head,
80                config: config_container.config.clone(),
81                reth_config: config_container.toml_config.clone(),
82                components: components.clone(),
83                events,
84                notifications,
85            };
86
87            let executor = components.task_executor().clone();
88            exexes.push(async move {
89                debug!(target: "reth::cli", id, "spawning exex");
90                let span = reth_tracing::tracing::info_span!("exex", id);
91
92                // init the exex
93                let exex = exex.launch(context).instrument(span.clone()).await.unwrap();
94
95                // spawn it as a crit task
96                executor.spawn_critical(
97                    "exex",
98                    async move {
99                        info!(target: "reth::cli", "ExEx started");
100                        match exex.await {
101                            Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
102                            Err(err) => panic!("ExEx {id} crashed: {err}"),
103                        }
104                    }
105                    .instrument(span),
106                );
107            });
108        }
109
110        future::join_all(exexes).await;
111
112        // spawn exex manager
113        debug!(target: "reth::cli", "spawning exex manager");
114        let exex_manager = ExExManager::new(
115            components.provider().clone(),
116            exex_handles,
117            DEFAULT_EXEX_MANAGER_CAPACITY,
118            exex_wal,
119            components.provider().finalized_block_stream(),
120        );
121        let exex_manager_handle = exex_manager.handle();
122        components.task_executor().spawn_critical("exex manager", async move {
123            exex_manager.await.expect("exex manager crashed");
124        });
125
126        // send notifications from the blockchain tree to exex manager
127        let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
128        let mut handle = exex_manager_handle.clone();
129        components.task_executor().spawn_critical(
130            "exex manager blockchain tree notifications",
131            async move {
132                while let Ok(notification) = canon_state_notifications.recv().await {
133                    handle
134                        .send_async(ExExNotificationSource::BlockchainTree, notification.into())
135                        .await
136                        .expect("blockchain tree notification could not be sent to exex manager");
137                }
138            },
139        );
140
141        info!(target: "reth::cli", "ExEx Manager started");
142
143        Ok(Some(exex_manager_handle))
144    }
145}
146
147impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.debug_struct("ExExLauncher")
150            .field("head", &self.head)
151            .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
152            .field("components", &"...")
153            .field("config_container", &self.config_container)
154            .finish()
155    }
156}