exex_subscription/
main.rs1use alloy_primitives::{Address, U256};
4use futures::TryStreamExt;
5use jsonrpsee::{
6 core::SubscriptionResult, proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage,
7};
8use reth_ethereum::{
9 exex::{ExExContext, ExExEvent, ExExNotification},
10 node::{api::FullNodeComponents, builder::NodeHandleFor, EthereumNode},
11};
12use std::collections::HashMap;
13use tokio::sync::{mpsc, oneshot};
14use tracing::{error, info};
15
16#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
19struct StorageDiff {
20 address: Address,
21 key: U256,
22 old_value: U256,
23 new_value: U256,
24}
25
26struct SubscriptionRequest {
28 address: Address,
30 response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
32}
33
34type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
36
37#[rpc(server, namespace = "watcher")]
39pub trait StorageWatcherApi {
40 #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
43 fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
44}
45
46#[derive(Clone)]
48struct StorageWatcherRpc {
49 subscriptions: SubscriptionSender,
51}
52
53impl StorageWatcherRpc {
54 fn new(subscriptions: SubscriptionSender) -> Self {
56 Self { subscriptions }
57 }
58}
59
60impl StorageWatcherApiServer for StorageWatcherRpc {
61 fn subscribe_storage_changes(
62 &self,
63 pending: PendingSubscriptionSink,
64 address: Address,
65 ) -> SubscriptionResult {
66 let subscription = self.subscriptions.clone();
67
68 tokio::spawn(async move {
69 let sink = match pending.accept().await {
70 Ok(sink) => sink,
71 Err(e) => {
72 error!("failed to accept subscription: {e}");
73 return;
74 }
75 };
76
77 let (resp_tx, resp_rx) = oneshot::channel();
78 subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
79
80 let Ok(mut rx) = resp_rx.await else { return };
81
82 while let Some(diff) = rx.recv().await {
83 let msg = SubscriptionMessage::from(
84 serde_json::value::to_raw_value(&diff).expect("serialize"),
85 );
86 if sink.send(msg).await.is_err() {
87 break;
88 }
89 }
90 });
91
92 Ok(())
93 }
94}
95
96async fn my_exex<Node: FullNodeComponents>(
97 mut ctx: ExExContext<Node>,
98 mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
99) -> eyre::Result<()> {
100 let mut subscriptions: HashMap<Address, Vec<mpsc::UnboundedSender<StorageDiff>>> =
101 HashMap::new();
102
103 loop {
104 tokio::select! {
105 maybe_notification = ctx.notifications.try_next() => {
106 let notification = match maybe_notification? {
107 Some(notification) => notification,
108 None => break,
109 };
110
111 match ¬ification {
112 ExExNotification::ChainCommitted { new } => {
113 info!(committed_chain = ?new.range(), "Received commit");
114 let execution_outcome = new.execution_outcome();
115
116 for (address, senders) in subscriptions.iter_mut() {
117 for change in &execution_outcome.bundle.state {
118 if change.0 == address {
119 for (key, slot) in &change.1.storage {
120 let diff = StorageDiff {
121 address: *change.0,
122 key: *key,
123 old_value: slot.original_value(),
124 new_value: slot.present_value(),
125 };
126 senders.retain(|sender| sender.send(diff).is_ok());
128 }
129 }
130 }
131 }
132 }
133 ExExNotification::ChainReorged { old, new } => {
134 info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
135 }
136 ExExNotification::ChainReverted { old } => {
137 info!(reverted_chain = ?old.range(), "Received revert");
138 }
139 }
140
141 if let Some(committed_chain) = notification.committed_chain() {
142 ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
143 }
144 }
145
146 maybe_subscription = subscription_requests.recv() => {
147 match maybe_subscription {
148 Some(SubscriptionRequest { address, response }) => {
149 let (tx, rx) = mpsc::unbounded_channel();
150 subscriptions.entry(address).or_default().push(tx);
151 let _ = response.send(rx);
152 }
153 None => {
154 }
156 }
157 }
158 }
159 }
160
161 Ok(())
162}
163
164fn main() -> eyre::Result<()> {
165 reth_ethereum::cli::Cli::parse_args().run(|builder, _| async move {
166 let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
167 let rpc = StorageWatcherRpc::new(subscriptions_tx);
168
169 let handle: NodeHandleFor<EthereumNode> = builder
170 .node(EthereumNode::default())
171 .extend_rpc_modules(move |ctx| {
172 ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
173 Ok(())
174 })
175 .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
176 .launch()
177 .await?;
178
179 handle.wait_for_node_exit().await
180 })
181}