reth_node_builder/launch/
exex.rs
1use alloy_eips::{eip2124::Head, BlockNumHash};
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8 ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9 DEFAULT_EXEX_MANAGER_CAPACITY,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_provider::CanonStateSubscriptions;
13use reth_tracing::tracing::{debug, info};
14use std::{fmt, fmt::Debug};
15use tracing::Instrument;
16
17use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
18
19pub struct ExExLauncher<Node: FullNodeComponents> {
21 head: Head,
22 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
23 components: Node,
24 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
25}
26
27impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
28 pub const fn new(
30 head: Head,
31 components: Node,
32 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
33 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
34 ) -> Self {
35 Self { head, extensions, components, config_container }
36 }
37
38 pub async fn launch(
43 self,
44 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
45 let Self { head, extensions, components, config_container } = self;
46 let head = BlockNumHash::new(head.number, head.hash);
47
48 if extensions.is_empty() {
49 return Ok(None)
51 }
52
53 info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
54 let exex_wal = Wal::new(
55 config_container
56 .config
57 .datadir
58 .clone()
59 .resolve_datadir(config_container.config.chain.chain())
60 .exex_wal(),
61 )?;
62
63 let mut exex_handles = Vec::with_capacity(extensions.len());
64 let mut exexes = Vec::with_capacity(extensions.len());
65
66 for (id, exex) in extensions {
67 let (handle, events, notifications) = ExExHandle::new(
69 id.clone(),
70 head,
71 components.provider().clone(),
72 components.block_executor().clone(),
73 exex_wal.handle(),
74 );
75 exex_handles.push(handle);
76
77 let context = ExExContext {
79 head,
80 config: config_container.config.clone(),
81 reth_config: config_container.toml_config.clone(),
82 components: components.clone(),
83 events,
84 notifications,
85 };
86
87 let executor = components.task_executor().clone();
88 exexes.push(async move {
89 debug!(target: "reth::cli", id, "spawning exex");
90 let span = reth_tracing::tracing::info_span!("exex", id);
91
92 let exex = exex.launch(context).instrument(span.clone()).await.unwrap();
94
95 executor.spawn_critical(
97 "exex",
98 async move {
99 info!(target: "reth::cli", "ExEx started");
100 match exex.await {
101 Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
102 Err(err) => panic!("ExEx {id} crashed: {err}"),
103 }
104 }
105 .instrument(span),
106 );
107 });
108 }
109
110 future::join_all(exexes).await;
111
112 debug!(target: "reth::cli", "spawning exex manager");
114 let exex_manager = ExExManager::new(
115 components.provider().clone(),
116 exex_handles,
117 DEFAULT_EXEX_MANAGER_CAPACITY,
118 exex_wal,
119 components.provider().finalized_block_stream(),
120 );
121 let exex_manager_handle = exex_manager.handle();
122 components.task_executor().spawn_critical("exex manager", async move {
123 exex_manager.await.expect("exex manager crashed");
124 });
125
126 let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
128 let mut handle = exex_manager_handle.clone();
129 components.task_executor().spawn_critical(
130 "exex manager blockchain tree notifications",
131 async move {
132 while let Ok(notification) = canon_state_notifications.recv().await {
133 handle
134 .send_async(ExExNotificationSource::BlockchainTree, notification.into())
135 .await
136 .expect("blockchain tree notification could not be sent to exex manager");
137 }
138 },
139 );
140
141 info!(target: "reth::cli", "ExEx Manager started");
142
143 Ok(Some(exex_manager_handle))
144 }
145}
146
147impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
148 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149 f.debug_struct("ExExLauncher")
150 .field("head", &self.head)
151 .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
152 .field("components", &"...")
153 .field("config_container", &self.config_container)
154 .finish()
155 }
156}