reth_node_builder/launch/
exex.rs1use alloy_eips::{eip2124::Head, BlockNumHash};
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8 ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9 DEFAULT_EXEX_MANAGER_CAPACITY, DEFAULT_WAL_BLOCKS_WARNING,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_provider::CanonStateSubscriptions;
13use reth_tracing::tracing::{debug, info};
14use std::{fmt, fmt::Debug};
15use tracing::Instrument;
16
17use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
18
19pub struct ExExLauncher<Node: FullNodeComponents> {
21 head: Head,
22 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
23 components: Node,
24 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
25 wal_blocks_warning: usize,
27}
28
29impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
30 pub const fn new(
32 head: Head,
33 components: Node,
34 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
35 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
36 ) -> Self {
37 Self {
38 head,
39 extensions,
40 components,
41 config_container,
42 wal_blocks_warning: DEFAULT_WAL_BLOCKS_WARNING,
43 }
44 }
45
46 pub const fn with_wal_blocks_warning(mut self, threshold: usize) -> Self {
52 self.wal_blocks_warning = threshold;
53 self
54 }
55
56 pub async fn launch(
61 self,
62 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
63 let Self { head, extensions, components, config_container, wal_blocks_warning } = self;
64 let head = BlockNumHash::new(head.number, head.hash);
65
66 if extensions.is_empty() {
67 return Ok(None)
69 }
70
71 info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
72 let exex_wal = Wal::new(
73 config_container
74 .config
75 .datadir
76 .clone()
77 .resolve_datadir(config_container.config.chain.chain())
78 .exex_wal(),
79 )?;
80
81 let mut exex_handles = Vec::with_capacity(extensions.len());
82 let mut exexes = Vec::with_capacity(extensions.len());
83
84 for (id, exex) in extensions {
85 let (handle, events, notifications) = ExExHandle::new(
87 id.clone(),
88 head,
89 components.provider().clone(),
90 components.evm_config().clone(),
91 exex_wal.handle(),
92 );
93 exex_handles.push(handle);
94
95 let context = ExExContext {
97 head,
98 config: config_container.config.clone(),
99 reth_config: config_container.toml_config.clone(),
100 components: components.clone(),
101 events,
102 notifications,
103 };
104
105 let executor = components.task_executor().clone();
106 exexes.push(async move {
107 debug!(target: "reth::cli", id, "spawning exex");
108 let span = reth_tracing::tracing::info_span!("exex", id);
109
110 let exex = exex.launch(context).instrument(span.clone()).await?;
112
113 executor.spawn_critical(
115 "exex",
116 async move {
117 info!(target: "reth::cli", "ExEx started");
118 match exex.await {
119 Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
120 Err(err) => panic!("ExEx {id} crashed: {err}"),
121 }
122 }
123 .instrument(span),
124 );
125
126 Ok::<(), eyre::Error>(())
127 });
128 }
129
130 future::try_join_all(exexes).await?;
131
132 debug!(target: "reth::cli", "spawning exex manager");
134 let exex_manager = ExExManager::new(
135 components.provider().clone(),
136 exex_handles,
137 DEFAULT_EXEX_MANAGER_CAPACITY,
138 exex_wal,
139 components.provider().finalized_block_stream(),
140 )
141 .with_wal_blocks_warning(wal_blocks_warning);
142 let exex_manager_handle = exex_manager.handle();
143 components.task_executor().spawn_critical("exex manager", async move {
144 exex_manager.await.expect("exex manager crashed");
145 });
146
147 let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
149 let mut handle = exex_manager_handle.clone();
150 components.task_executor().spawn_critical(
151 "exex manager blockchain tree notifications",
152 async move {
153 while let Ok(notification) = canon_state_notifications.recv().await {
154 handle
155 .send_async(ExExNotificationSource::BlockchainTree, notification.into())
156 .await
157 .expect("blockchain tree notification could not be sent to exex manager");
158 }
159 },
160 );
161
162 info!(target: "reth::cli", "ExEx Manager started");
163
164 Ok(Some(exex_manager_handle))
165 }
166}
167
168impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 f.debug_struct("ExExLauncher")
171 .field("head", &self.head)
172 .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
173 .field("components", &"...")
174 .field("config_container", &self.config_container)
175 .field("wal_blocks_warning", &self.wal_blocks_warning)
176 .finish()
177 }
178}