reth_node_builder/launch/
exex.rs1use alloy_eips::{eip2124::Head, BlockNumHash};
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8 ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9 DEFAULT_EXEX_MANAGER_CAPACITY, DEFAULT_WAL_BLOCKS_WARNING,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_provider::CanonStateSubscriptions;
13use reth_tracing::tracing::{debug, info};
14use std::{fmt, fmt::Debug};
15use tracing::Instrument;
16
17use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
18
19pub struct ExExLauncher<Node: FullNodeComponents> {
21 head: Head,
22 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
23 components: Node,
24 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
25 wal_blocks_warning: usize,
27 capacity: usize,
29}
30
31impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
32 pub const fn new(
34 head: Head,
35 components: Node,
36 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
37 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
38 ) -> Self {
39 Self {
40 head,
41 extensions,
42 components,
43 config_container,
44 wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
45 capacity: DEFAULT_EXEX_MANAGER_CAPACITY,
46 }
47 }
48
49 pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
55 self.wal_blocks_warning = threshold;
56 self
57 }
58
59 pub const fn with_capacity(mut self, capacity: usize) -> Self {
61 self.capacity = capacity;
62 self
63 }
64
65 pub async fn launch(
70 self,
71 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
72 let Self { head, extensions, components, config_container, wal_blocks_warning, capacity } =
73 self;
74 let head = BlockNumHash::new(head.number, head.hash);
75
76 if extensions.is_empty() {
77 return Ok(None)
79 }
80
81 info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
82 let exex_wal = Wal::new(
83 config_container
84 .config
85 .datadir
86 .clone()
87 .resolve_datadir(config_container.config.chain.chain())
88 .exex_wal(),
89 )?;
90
91 let mut exex_handles = Vec::with_capacity(extensions.len());
92 let mut exexes = Vec::with_capacity(extensions.len());
93
94 for (id, exex) in extensions {
95 let (handle, events, notifications) = ExExHandle::new(
97 id.clone(),
98 head,
99 components.provider().clone(),
100 components.evm_config().clone(),
101 exex_wal.handle(),
102 );
103 exex_handles.push(handle);
104
105 let context = ExExContext {
107 head,
108 config: config_container.config.clone(),
109 reth_config: config_container.toml_config.clone(),
110 components: components.clone(),
111 events,
112 notifications,
113 };
114
115 let executor = components.task_executor().clone();
116 exexes.push(async move {
117 debug!(target: "reth::cli", id, "spawning exex");
118 let span = reth_tracing::tracing::info_span!("exex", id);
119
120 let exex = exex.launch(context).instrument(span.clone()).await?;
122
123 executor.spawn_critical_task(
125 "exex",
126 async move {
127 info!(target: "reth::cli", "ExEx started");
128 match exex.await {
129 Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
130 Err(err) => panic!("ExEx {id} crashed: {err}"),
131 }
132 }
133 .instrument(span),
134 );
135
136 Ok::<(), eyre::Error>(())
137 });
138 }
139
140 future::try_join_all(exexes).await?;
141
142 debug!(target: "reth::cli", "spawning exex manager");
144 let exex_manager = ExExManager::new(
145 components.provider().clone(),
146 exex_handles,
147 capacity,
148 exex_wal,
149 components.provider().finalized_block_stream(),
150 )
151 .with_wal_blocks_warning(wal_blocks_warning);
152 let exex_manager_handle = exex_manager.handle();
153 components.task_executor().spawn_critical_task("exex manager", async move {
154 exex_manager.await.expect("exex manager crashed");
155 });
156
157 let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
159 let mut handle = exex_manager_handle.clone();
160 components.task_executor().spawn_critical_task(
161 "exex manager blockchain tree notifications",
162 async move {
163 while let Ok(notification) = canon_state_notifications.recv().await {
164 handle
165 .send_async(ExExNotificationSource::BlockchainTree, notification.into())
166 .await
167 .expect("blockchain tree notification could not be sent to exex manager");
168 }
169 },
170 );
171
172 info!(target: "reth::cli", "ExEx Manager started");
173
174 Ok(Some(exex_manager_handle))
175 }
176}
177
178impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.debug_struct("ExExLauncher")
181 .field("head", &self.head)
182 .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
183 .field("components", &"...")
184 .field("config_container", &self.config_container)
185 .field("wal_blocks_warning", &self.wal_blocks_warning)
186 .finish()
187 }
188}