reth_node_builder/launch/
exex.rs
1use alloy_eips::BlockNumHash;
4use futures::future;
5use reth_chain_state::ForkChoiceSubscriptions;
6use reth_chainspec::EthChainSpec;
7use reth_exex::{
8 ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
9 DEFAULT_EXEX_MANAGER_CAPACITY,
10};
11use reth_node_api::{FullNodeComponents, NodeTypes, PrimitivesTy};
12use reth_primitives::Head;
13use reth_provider::CanonStateSubscriptions;
14use reth_tracing::tracing::{debug, info};
15use std::{fmt, fmt::Debug};
16use tracing::Instrument;
17
18use crate::{common::WithConfigs, exex::BoxedLaunchExEx};
19
20pub struct ExExLauncher<Node: FullNodeComponents> {
22 head: Head,
23 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
24 components: Node,
25 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
26}
27
28impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
29 pub const fn new(
31 head: Head,
32 components: Node,
33 extensions: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
34 config_container: WithConfigs<<Node::Types as NodeTypes>::ChainSpec>,
35 ) -> Self {
36 Self { head, extensions, components, config_container }
37 }
38
39 pub async fn launch(
44 self,
45 ) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
46 let Self { head, extensions, components, config_container } = self;
47 let head = BlockNumHash::new(head.number, head.hash);
48
49 if extensions.is_empty() {
50 return Ok(None)
52 }
53
54 info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
55 let exex_wal = Wal::new(
56 config_container
57 .config
58 .datadir
59 .clone()
60 .resolve_datadir(config_container.config.chain.chain())
61 .exex_wal(),
62 )?;
63
64 let mut exex_handles = Vec::with_capacity(extensions.len());
65 let mut exexes = Vec::with_capacity(extensions.len());
66
67 for (id, exex) in extensions {
68 let (handle, events, notifications) = ExExHandle::new(
70 id.clone(),
71 head,
72 components.provider().clone(),
73 components.block_executor().clone(),
74 exex_wal.handle(),
75 );
76 exex_handles.push(handle);
77
78 let context = ExExContext {
80 head,
81 config: config_container.config.clone(),
82 reth_config: config_container.toml_config.clone(),
83 components: components.clone(),
84 events,
85 notifications,
86 };
87
88 let executor = components.task_executor().clone();
89 exexes.push(async move {
90 debug!(target: "reth::cli", id, "spawning exex");
91 let span = reth_tracing::tracing::info_span!("exex", id);
92
93 let exex = exex.launch(context).instrument(span.clone()).await.unwrap();
95
96 executor.spawn_critical(
98 "exex",
99 async move {
100 info!(target: "reth::cli", "ExEx started");
101 match exex.await {
102 Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
103 Err(err) => panic!("ExEx {id} crashed: {err}"),
104 }
105 }
106 .instrument(span),
107 );
108 });
109 }
110
111 future::join_all(exexes).await;
112
113 debug!(target: "reth::cli", "spawning exex manager");
115 let exex_manager = ExExManager::new(
116 components.provider().clone(),
117 exex_handles,
118 DEFAULT_EXEX_MANAGER_CAPACITY,
119 exex_wal,
120 components.provider().finalized_block_stream(),
121 );
122 let exex_manager_handle = exex_manager.handle();
123 components.task_executor().spawn_critical("exex manager", async move {
124 exex_manager.await.expect("exex manager crashed");
125 });
126
127 let mut canon_state_notifications = components.provider().subscribe_to_canonical_state();
129 let mut handle = exex_manager_handle.clone();
130 components.task_executor().spawn_critical(
131 "exex manager blockchain tree notifications",
132 async move {
133 while let Ok(notification) = canon_state_notifications.recv().await {
134 handle
135 .send_async(ExExNotificationSource::BlockchainTree, notification.into())
136 .await
137 .expect("blockchain tree notification could not be sent to exex manager");
138 }
139 },
140 );
141
142 info!(target: "reth::cli", "ExEx Manager started");
143
144 Ok(Some(exex_manager_handle))
145 }
146}
147
148impl<Node: FullNodeComponents> Debug for ExExLauncher<Node> {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 f.debug_struct("ExExLauncher")
151 .field("head", &self.head)
152 .field("extensions", &self.extensions.iter().map(|(id, _)| id).collect::<Vec<_>>())
153 .field("components", &"...")
154 .field("config_container", &self.config_container)
155 .finish()
156 }
157}