Skip to main content

reth_node_builder/launch/
exex.rs

1//! Support for launching execution extensions.
2
3use alloy_eips::{eip2124::Head, BlockNumHash};
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8    ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9    DEFAULT_EXEX_MANAGER_CAPACITY, DEFAULT_WAL_BLOCKS_WARNING,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_provider::CanonStateSubscriptions;
13use reth_tracing::tracing::{debug, info};
14use std::{fmt, fmt::Debug};
15use tracing::Instrument;
16
17use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
18
19/// Can launch execution extensions.
20pub struct ExExLauncher<Node: FullNodeComponents> {
21    head: Head,
22    extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
23    components: Node,
24    config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
25    /// The threshold for the number of blocks in the WAL before emitting a warning.
26    wal_blocks_warning: usize,
27    /// The max notification buffer capacity for the ExEx manager.
28    capacity: usize,
29}
30
31impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
32    /// Create a new `ExExLauncher` with the given extensions.
33    pub const fn new(
34        head: Head,
35        components: Node,
36        extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
37        config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
38    ) -> Self {
39        Self {
40            head,
41            extensions,
42            components,
43            config_container,
44            wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
45            capacity: DEFAULT_EXEX_MANAGER_CAPACITY,
46        }
47    }
48
49    /// Sets the threshold for the number of blocks in the WAL before emitting a warning.
50    ///
51    /// For L2 chains with faster block times, this value should be increased proportionally
52    /// to avoid excessive warnings. For example, a chain with 2-second block times might use
53    /// a value 6x higher than the default (768 instead of 128).
54    pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
55        self.wal_blocks_warning = threshold;
56        self
57    }
58
59    /// Sets the max notification buffer capacity for the [`ExExManager`].
60    pub const fn with_capacity(mut self, capacity: usize) -> Self {
61        self.capacity = capacity;
62        self
63    }
64
65    /// Launches all execution extensions.
66    ///
67    /// Spawns all extensions and returns the handle to the exex manager if any extensions are
68    /// installed.
69    pub async fn launch(
70        self,
71    ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
72        let Self { head, extensions, components, config_container, wal_blocks_warning, capacity } =
73            self;
74        let head = BlockNumHash::new(head.number, head.hash);
75
76        if extensions.is_empty() {
77            // nothing to launch
78            return Ok(None)
79        }
80
81        info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
82        let exex_wal = Wal::new(
83            config_container
84                .config
85                .datadir
86                .clone()
87                .resolve_datadir(config_container.config.chain.chain())
88                .exex_wal(),
89        )?;
90
91        let mut exex_handles = Vec::with_capacity(extensions.len());
92        let mut exexes = Vec::with_capacity(extensions.len());
93
94        for (id, exex) in extensions {
95            // create a new exex handle
96            let (handle, events, notifications) = ExExHandle::new(
97                id.clone(),
98                head,
99                components.provider().clone(),
100                components.evm_config().clone(),
101                exex_wal.handle(),
102            );
103            exex_handles.push(handle);
104
105            // create the launch context for the exex
106            let context = ExExContext {
107                head,
108                config: config_container.config.clone(),
109                reth_config: config_container.toml_config.clone(),
110                components: components.clone(),
111                events,
112                notifications,
113            };
114
115            let executor = components.task_executor().clone();
116            exexes.push(async move {
117                debug!(target: "reth::cli", id, "spawning exex");
118                let span = reth_tracing::tracing::info_span!("exex", id);
119
120                // init the exex
121                let exex = exex.launch(context).instrument(span.clone()).await?;
122
123                // spawn it as a crit task
124                executor.spawn_critical_task(
125                    "exex",
126                    async move {
127                        info!(target: "reth::cli", "ExEx started");
128                        match exex.await {
129                            Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
130                            Err(err) => panic!("ExEx {id} crashed: {err}"),
131                        }
132                    }
133                    .instrument(span),
134                );
135
136                Ok::<(), eyre::Error>(())
137            });
138        }
139
140        future::try_join_all(exexes).await?;
141
142        // spawn exex manager
143        debug!(target: "reth::cli", "spawning exex manager");
144        let exex_manager = ExExManager::new(
145            components.provider().clone(),
146            exex_handles,
147            capacity,
148            exex_wal,
149            components.provider().finalized_block_stream(),
150        )
151        .with_wal_blocks_warning(wal_blocks_warning);
152        let exex_manager_handle = exex_manager.handle();
153        components.task_executor().spawn_critical_task("exex manager", async move {
154            exex_manager.await.expect("exex manager crashed");
155        });
156
157        // send notifications from the blockchain tree to exex manager
158        let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
159        let mut handle = exex_manager_handle.clone();
160        components.task_executor().spawn_critical_task(
161            "exex manager blockchain tree notifications",
162            async move {
163                while let Ok(notification) = canon_state_notifications.recv().await {
164                    handle
165                        .send_async(ExExNotificationSource::BlockchainTree, notification.into())
166                        .await
167                        .expect("blockchain tree notification could not be sent to exex manager");
168                }
169            },
170        );
171
172        info!(target: "reth::cli", "ExEx Manager started");
173
174        Ok(Some(exex_manager_handle))
175    }
176}
177
178impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("ExExLauncher")
181            .field("head", &self.head)
182            .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
183            .field("components", &"...")
184            .field("config_container", &self.config_container)
185            .field("wal_blocks_warning", &self.wal_blocks_warning)
186            .finish()
187    }
188}