exex_subscription/
main.rs1#![allow(dead_code)]
2
3#[allow(dead_code)]
6use alloy_primitives::{Address, U256};
7use clap::Parser;
8use futures::TryStreamExt;
9use jsonrpsee::{
10 core::SubscriptionResult, proc_macros::rpc, tracing, PendingSubscriptionSink,
11 SubscriptionMessage,
12};
13use reth_ethereum::{
14 exex::{ExExContext, ExExEvent, ExExNotification},
15 node::{api::FullNodeComponents, EthereumNode},
16};
17use std::collections::HashMap;
18use tokio::sync::{mpsc, oneshot};
19use tracing::{error, info};
20
21#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
24struct StorageDiff {
25 address: Address,
26 key: U256,
27 old_value: U256,
28 new_value: U256,
29}
30
31struct SubscriptionRequest {
33 address: Address,
35 response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
37}
38
39type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
41
42#[rpc(server, namespace = "watcher")]
44pub trait StorageWatcherApi {
45 #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
48 fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
49}
50
51#[derive(Clone)]
53struct StorageWatcherRpc {
54 subscriptions: SubscriptionSender,
56}
57
58impl StorageWatcherRpc {
59 fn new(subscriptions: SubscriptionSender) -> Self {
61 Self { subscriptions }
62 }
63}
64
65impl StorageWatcherApiServer for StorageWatcherRpc {
66 fn subscribe_storage_changes(
67 &self,
68 pending: PendingSubscriptionSink,
69 address: Address,
70 ) -> SubscriptionResult {
71 let subscription = self.subscriptions.clone();
72
73 tokio::spawn(async move {
74 let sink = match pending.accept().await {
75 Ok(sink) => sink,
76 Err(e) => {
77 error!("failed to accept subscription: {e}");
78 return;
79 }
80 };
81
82 let (resp_tx, resp_rx) = oneshot::channel();
83 subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
84
85 let Ok(mut rx) = resp_rx.await else { return };
86
87 while let Some(diff) = rx.recv().await {
88 let msg = SubscriptionMessage::from(
89 serde_json::value::to_raw_value(&diff).expect("serialize"),
90 );
91 if sink.send(msg).await.is_err() {
92 break;
93 }
94 }
95 });
96
97 Ok(())
98 }
99}
100
101async fn my_exex<Node: FullNodeComponents>(
102 mut ctx: ExExContext<Node>,
103 mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
104) -> eyre::Result<()> {
105 let mut subscriptions: HashMap<Address, Vec<mpsc::UnboundedSender<StorageDiff>>> =
106 HashMap::new();
107
108 loop {
109 tokio::select! {
110 maybe_notification = ctx.notifications.try_next() => {
111 let notification = match maybe_notification? {
112 Some(notification) => notification,
113 None => break,
114 };
115
116 match ¬ification {
117 ExExNotification::ChainCommitted { new } => {
118 info!(committed_chain = ?new.range(), "Received commit");
119 let execution_outcome = new.execution_outcome();
120
121 for (address, senders) in subscriptions.iter_mut() {
122 for change in &execution_outcome.bundle.state {
123 if change.0 == address {
124 for (key, slot) in &change.1.storage {
125 let diff = StorageDiff {
126 address: *change.0,
127 key: *key,
128 old_value: slot.original_value(),
129 new_value: slot.present_value(),
130 };
131 senders.retain(|sender| sender.send(diff).is_ok());
133 }
134 }
135 }
136 }
137 }
138 ExExNotification::ChainReorged { old, new } => {
139 info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
140 }
141 ExExNotification::ChainReverted { old } => {
142 info!(reverted_chain = ?old.range(), "Received revert");
143 }
144 }
145
146 if let Some(committed_chain) = notification.committed_chain() {
147 ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
148 }
149 }
150
151 maybe_subscription = subscription_requests.recv() => {
152 match maybe_subscription {
153 Some(SubscriptionRequest { address, response }) => {
154 let (tx, rx) = mpsc::unbounded_channel();
155 subscriptions.entry(address).or_default().push(tx);
156 let _ = response.send(rx);
157 }
158 None => {
159 }
161 }
162 }
163 }
164 }
165
166 Ok(())
167}
168
169#[derive(Parser, Debug)]
170struct Args {
171 #[arg(long)]
172 enable_ext: bool,
173}
174
175fn main() -> eyre::Result<()> {
176 reth_ethereum::cli::Cli::parse_args().run(|builder, _args| async move {
177 let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
178
179 let rpc = StorageWatcherRpc::new(subscriptions_tx.clone());
180
181 let handle = builder
182 .node(EthereumNode::default())
183 .extend_rpc_modules(move |ctx| {
184 ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
185 Ok(())
186 })
187 .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
188 .launch()
189 .await?;
190
191 handle.wait_for_node_exit().await
192 })
193}