exex_subscription/
main.rs1use alloy_primitives::{map::AddressMap, Address, U256};
4use futures::TryStreamExt;
5use jsonrpsee::{
6 core::SubscriptionResult, proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage,
7};
8use reth_ethereum::{
9 exex::{ExExContext, ExExEvent, ExExNotification},
10 node::{api::FullNodeComponents, builder::NodeHandleFor, EthereumNode},
11};
12use tokio::sync::{mpsc, oneshot};
13use tracing::{error, info};
14
15#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
18struct StorageDiff {
19 address: Address,
20 key: U256,
21 old_value: U256,
22 new_value: U256,
23}
24
25struct SubscriptionRequest {
27 address: Address,
29 response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
31}
32
33type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
35
36#[rpc(server, namespace = "watcher")]
38pub trait StorageWatcherApi {
39 #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
42 fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
43}
44
45#[derive(Clone)]
47struct StorageWatcherRpc {
48 subscriptions: SubscriptionSender,
50}
51
52impl StorageWatcherRpc {
53 fn new(subscriptions: SubscriptionSender) -> Self {
55 Self { subscriptions }
56 }
57}
58
59impl StorageWatcherApiServer for StorageWatcherRpc {
60 fn subscribe_storage_changes(
61 &self,
62 pending: PendingSubscriptionSink,
63 address: Address,
64 ) -> SubscriptionResult {
65 let subscription = self.subscriptions.clone();
66
67 tokio::spawn(async move {
68 let sink = match pending.accept().await {
69 Ok(sink) => sink,
70 Err(e) => {
71 error!("failed to accept subscription: {e}");
72 return;
73 }
74 };
75
76 let (resp_tx, resp_rx) = oneshot::channel();
77 subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
78
79 let Ok(mut rx) = resp_rx.await else { return };
80
81 while let Some(diff) = rx.recv().await {
82 let msg = SubscriptionMessage::from(
83 serde_json::value::to_raw_value(&diff).expect("serialize"),
84 );
85 if sink.send(msg).await.is_err() {
86 break;
87 }
88 }
89 });
90
91 Ok(())
92 }
93}
94
95async fn my_exex<Node: FullNodeComponents>(
96 mut ctx: ExExContext<Node>,
97 mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
98) -> eyre::Result<()> {
99 let mut subscriptions: AddressMap<Vec<mpsc::UnboundedSender<StorageDiff>>> =
100 AddressMap::default();
101
102 loop {
103 tokio::select! {
104 maybe_notification = ctx.notifications.try_next() => {
105 let notification = match maybe_notification? {
106 Some(notification) => notification,
107 None => break,
108 };
109
110 match ¬ification {
111 ExExNotification::ChainCommitted { new } => {
112 info!(committed_chain = ?new.range(), "Received commit");
113 let execution_outcome = new.execution_outcome();
114
115 for (address, senders) in subscriptions.iter_mut() {
116 for change in &execution_outcome.bundle.state {
117 if change.0 == address {
118 for (key, slot) in &change.1.storage {
119 let diff = StorageDiff {
120 address: *change.0,
121 key: *key,
122 old_value: slot.original_value(),
123 new_value: slot.present_value(),
124 };
125 senders.retain(|sender| sender.send(diff).is_ok());
127 }
128 }
129 }
130 }
131 }
132 ExExNotification::ChainReorged { old, new } => {
133 info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
134 }
135 ExExNotification::ChainReverted { old } => {
136 info!(reverted_chain = ?old.range(), "Received revert");
137 }
138 }
139
140 if let Some(committed_chain) = notification.committed_chain() {
141 ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
142 }
143 }
144
145 maybe_subscription = subscription_requests.recv() => {
146 match maybe_subscription {
147 Some(SubscriptionRequest { address, response }) => {
148 let (tx, rx) = mpsc::unbounded_channel();
149 subscriptions.entry(address).or_default().push(tx);
150 let _ = response.send(rx);
151 }
152 None => {
153 }
155 }
156 }
157 }
158 }
159
160 Ok(())
161}
162
163fn main() -> eyre::Result<()> {
164 reth_ethereum::cli::Cli::parse_args().run(|builder, _| async move {
165 let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
166 let rpc = StorageWatcherRpc::new(subscriptions_tx);
167
168 let handle: NodeHandleFor<EthereumNode> = builder
169 .node(EthereumNode::default())
170 .extend_rpc_modules(move |ctx| {
171 ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
172 Ok(())
173 })
174 .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
175 .launch()
176 .await?;
177
178 handle.wait_for_node_exit().await
179 })
180}