reth_node_builder/launch/
exex.rs

1//! Support for launching execution extensions.
2
3use alloy_eips::{eip2124::Head, BlockNumHash};
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8    ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9    DEFAULT_EXEX_MANAGER_CAPACITY, DEFAULT_WAL_BLOCKS_WARNING,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_provider::CanonStateSubscriptions;
13use reth_tracing::tracing::{debug, info};
14use std::{fmt, fmt::Debug};
15use tracing::Instrument;
16
17use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
18
19/// Can launch execution extensions.
20pub struct ExExLauncher<Node: FullNodeComponents> {
21    head: Head,
22    extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
23    components: Node,
24    config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
25    /// The threshold for the number of blocks in the WAL before emitting a warning.
26    wal_blocks_warning: usize,
27}
28
29impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
30    /// Create a new `ExExLauncher` with the given extensions.
31    pub const fn new(
32        head: Head,
33        components: Node,
34        extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
35        config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
36    ) -> Self {
37        Self {
38            head,
39            extensions,
40            components,
41            config_container,
42            wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
43        }
44    }
45
46    /// Sets the threshold for the number of blocks in the WAL before emitting a warning.
47    ///
48    /// For L2 chains with faster block times, this value should be increased proportionally
49    /// to avoid excessive warnings. For example, a chain with 2-second block times might use
50    /// a value 6x higher than the default (768 instead of 128).
51    pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
52        self.wal_blocks_warning = threshold;
53        self
54    }
55
56    /// Launches all execution extensions.
57    ///
58    /// Spawns all extensions and returns the handle to the exex manager if any extensions are
59    /// installed.
60    pub async fn launch(
61        self,
62    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
63        let Self { head, extensions, components, config_container, wal_blocks_warning } = self;
64        let head = BlockNumHash::new(head.number, head.hash);
65
66        if extensions.is_empty() {
67            // nothing to launch
68            return Ok(None)
69        }
70
71        info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
72        let exex_wal = Wal::new(
73            config_container
74                .config
75                .datadir
76                .clone()
77                .resolve_datadir(config_container.config.chain.chain())
78                .exex_wal(),
79        )?;
80
81        let mut exex_handles = Vec::with_capacity(extensions.len());
82        let mut exexes = Vec::with_capacity(extensions.len());
83
84        for (id, exex) in extensions {
85            // create a new exex handle
86            let (handle, events, notifications) = ExExHandle::new(
87                id.clone(),
88                head,
89                components.provider().clone(),
90                components.evm_config().clone(),
91                exex_wal.handle(),
92            );
93            exex_handles.push(handle);
94
95            // create the launch context for the exex
96            let context = ExExContext {
97                head,
98                config: config_container.config.clone(),
99                reth_config: config_container.toml_config.clone(),
100                components: components.clone(),
101                events,
102                notifications,
103            };
104
105            let executor = components.task_executor().clone();
106            exexes.push(async move {
107                debug!(target: "reth::cli", id, "spawning exex");
108                let span = reth_tracing::tracing::info_span!("exex", id);
109
110                // init the exex
111                let exex = exex.launch(context).instrument(span.clone()).await?;
112
113                // spawn it as a crit task
114                executor.spawn_critical(
115                    "exex",
116                    async move {
117                        info!(target: "reth::cli", "ExEx started");
118                        match exex.await {
119                            Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
120                            Err(err) => panic!("ExEx {id} crashed: {err}"),
121                        }
122                    }
123                    .instrument(span),
124                );
125
126                Ok::<(), eyre::Error>(())
127            });
128        }
129
130        future::try_join_all(exexes).await?;
131
132        // spawn exex manager
133        debug!(target: "reth::cli", "spawning exex manager");
134        let exex_manager = ExExManager::new(
135            components.provider().clone(),
136            exex_handles,
137            DEFAULT_EXEX_MANAGER_CAPACITY,
138            exex_wal,
139            components.provider().finalized_block_stream(),
140        )
141        .with_wal_blocks_warning(wal_blocks_warning);
142        let exex_manager_handle = exex_manager.handle();
143        components.task_executor().spawn_critical("exex manager", async move {
144            exex_manager.await.expect("exex manager crashed");
145        });
146
147        // send notifications from the blockchain tree to exex manager
148        let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
149        let mut handle = exex_manager_handle.clone();
150        components.task_executor().spawn_critical(
151            "exex manager blockchain tree notifications",
152            async move {
153                while let Ok(notification) = canon_state_notifications.recv().await {
154                    handle
155                        .send_async(ExExNotificationSource::BlockchainTree, notification.into())
156                        .await
157                        .expect("blockchain tree notification could not be sent to exex manager");
158                }
159            },
160        );
161
162        info!(target: "reth::cli", "ExEx Manager started");
163
164        Ok(Some(exex_manager_handle))
165    }
166}
167
168impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        f.debug_struct("ExExLauncher")
171            .field("head", &self.head)
172            .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
173            .field("components", &"...")
174            .field("config_container", &self.config_container)
175            .field("wal_blocks_warning", &self.wal_blocks_warning)
176            .finish()
177    }
178}