exex_subscription/
main.rs1#![allow(dead_code)]
2
3#[allow(dead_code)]
6use alloy_primitives::{Address, U256};
7use futures::TryStreamExt;
8use jsonrpsee::{
9 core::SubscriptionResult, proc_macros::rpc, tracing, PendingSubscriptionSink,
10 SubscriptionMessage,
11};
12use reth_ethereum::{
13 exex::{ExExContext, ExExEvent, ExExNotification},
14 node::{api::FullNodeComponents, builder::NodeHandleFor, EthereumNode},
15};
16use std::collections::HashMap;
17use tokio::sync::{mpsc, oneshot};
18use tracing::{error, info};
19
20#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
23struct StorageDiff {
24 address: Address,
25 key: U256,
26 old_value: U256,
27 new_value: U256,
28}
29
30struct SubscriptionRequest {
32 address: Address,
34 response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
36}
37
38type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
40
41#[rpc(server, namespace = "watcher")]
43pub trait StorageWatcherApi {
44 #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
47 fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
48}
49
50#[derive(Clone)]
52struct StorageWatcherRpc {
53 subscriptions: SubscriptionSender,
55}
56
57impl StorageWatcherRpc {
58 fn new(subscriptions: SubscriptionSender) -> Self {
60 Self { subscriptions }
61 }
62}
63
64impl StorageWatcherApiServer for StorageWatcherRpc {
65 fn subscribe_storage_changes(
66 &self,
67 pending: PendingSubscriptionSink,
68 address: Address,
69 ) -> SubscriptionResult {
70 let subscription = self.subscriptions.clone();
71
72 tokio::spawn(async move {
73 let sink = match pending.accept().await {
74 Ok(sink) => sink,
75 Err(e) => {
76 error!("failed to accept subscription: {e}");
77 return;
78 }
79 };
80
81 let (resp_tx, resp_rx) = oneshot::channel();
82 subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
83
84 let Ok(mut rx) = resp_rx.await else { return };
85
86 while let Some(diff) = rx.recv().await {
87 let msg = SubscriptionMessage::from(
88 serde_json::value::to_raw_value(&diff).expect("serialize"),
89 );
90 if sink.send(msg).await.is_err() {
91 break;
92 }
93 }
94 });
95
96 Ok(())
97 }
98}
99
100async fn my_exex<Node: FullNodeComponents>(
101 mut ctx: ExExContext<Node>,
102 mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
103) -> eyre::Result<()> {
104 let mut subscriptions: HashMap<Address, Vec<mpsc::UnboundedSender<StorageDiff>>> =
105 HashMap::new();
106
107 loop {
108 tokio::select! {
109 maybe_notification = ctx.notifications.try_next() => {
110 let notification = match maybe_notification? {
111 Some(notification) => notification,
112 None => break,
113 };
114
115 match ¬ification {
116 ExExNotification::ChainCommitted { new } => {
117 info!(committed_chain = ?new.range(), "Received commit");
118 let execution_outcome = new.execution_outcome();
119
120 for (address, senders) in subscriptions.iter_mut() {
121 for change in &execution_outcome.bundle.state {
122 if change.0 == address {
123 for (key, slot) in &change.1.storage {
124 let diff = StorageDiff {
125 address: *change.0,
126 key: *key,
127 old_value: slot.original_value(),
128 new_value: slot.present_value(),
129 };
130 senders.retain(|sender| sender.send(diff).is_ok());
132 }
133 }
134 }
135 }
136 }
137 ExExNotification::ChainReorged { old, new } => {
138 info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
139 }
140 ExExNotification::ChainReverted { old } => {
141 info!(reverted_chain = ?old.range(), "Received revert");
142 }
143 }
144
145 if let Some(committed_chain) = notification.committed_chain() {
146 ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
147 }
148 }
149
150 maybe_subscription = subscription_requests.recv() => {
151 match maybe_subscription {
152 Some(SubscriptionRequest { address, response }) => {
153 let (tx, rx) = mpsc::unbounded_channel();
154 subscriptions.entry(address).or_default().push(tx);
155 let _ = response.send(rx);
156 }
157 None => {
158 }
160 }
161 }
162 }
163 }
164
165 Ok(())
166}
167
168fn main() -> eyre::Result<()> {
169 reth_ethereum::cli::Cli::parse_args().run(|builder, _| async move {
170 let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
171
172 let rpc = StorageWatcherRpc::new(subscriptions_tx.clone());
173
174 let handle: NodeHandleFor<EthereumNode> = builder
175 .node(EthereumNode::default())
176 .extend_rpc_modules(move |ctx| {
177 ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
178 Ok(())
179 })
180 .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
181 .launch()
182 .await?;
183
184 handle.wait_for_node_exit().await
185 })
186}