Skip to main content

exex_subscription/
main.rs

1//! An ExEx example that installs a new RPC subscription endpoint that emits storage changes for a
2//! requested address.
3use alloy_primitives::{map::AddressMap, Address, U256};
4use futures::TryStreamExt;
5use jsonrpsee::{
6    core::SubscriptionResult, proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage,
7};
8use reth_ethereum::{
9    exex::{ExExContext, ExExEvent, ExExNotification},
10    node::{api::FullNodeComponents, builder::NodeHandleFor, EthereumNode},
11};
12use tokio::sync::{mpsc, oneshot};
13use tracing::{error, info};
14
15/// Subscription update format for storage changes.
16/// This is the format that will be sent to the client when a storage change occurs.
17#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
18struct StorageDiff {
19    address: Address,
20    key: U256,
21    old_value: U256,
22    new_value: U256,
23}
24
25/// Subscription request format for storage changes.
26struct SubscriptionRequest {
27    /// The address to subscribe to.
28    address: Address,
29    /// The response channel to send the subscription updates to.
30    response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
31}
32
33/// Subscription request format for storage changes.
34type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
35
36/// API to subscribe to storage changes for a specific Ethereum address.
37#[rpc(server, namespace = "watcher")]
38pub trait StorageWatcherApi {
39    /// Subscribes to storage changes for a given Ethereum address and streams `StorageDiff`
40    /// updates.
41    #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
42    fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
43}
44
45/// API implementation for the storage watcher.
46#[derive(Clone)]
47struct StorageWatcherRpc {
48    /// The subscription sender to send subscription requests to.
49    subscriptions: SubscriptionSender,
50}
51
52impl StorageWatcherRpc {
53    /// Creates a new [`StorageWatcherRpc`] instance with the given subscription sender.
54    fn new(subscriptions: SubscriptionSender) -> Self {
55        Self { subscriptions }
56    }
57}
58
59impl StorageWatcherApiServer for StorageWatcherRpc {
60    fn subscribe_storage_changes(
61        &self,
62        pending: PendingSubscriptionSink,
63        address: Address,
64    ) -> SubscriptionResult {
65        let subscription = self.subscriptions.clone();
66
67        tokio::spawn(async move {
68            let sink = match pending.accept().await {
69                Ok(sink) => sink,
70                Err(e) => {
71                    error!("failed to accept subscription: {e}");
72                    return;
73                }
74            };
75
76            let (resp_tx, resp_rx) = oneshot::channel();
77            subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
78
79            let Ok(mut rx) = resp_rx.await else { return };
80
81            while let Some(diff) = rx.recv().await {
82                let msg = SubscriptionMessage::from(
83                    serde_json::value::to_raw_value(&diff).expect("serialize"),
84                );
85                if sink.send(msg).await.is_err() {
86                    break;
87                }
88            }
89        });
90
91        Ok(())
92    }
93}
94
95async fn my_exex<Node: FullNodeComponents>(
96    mut ctx: ExExContext<Node>,
97    mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
98) -> eyre::Result<()> {
99    let mut subscriptions: AddressMap<Vec<mpsc::UnboundedSender<StorageDiff>>> =
100        AddressMap::default();
101
102    loop {
103        tokio::select! {
104            maybe_notification = ctx.notifications.try_next() => {
105                let notification = match maybe_notification? {
106                    Some(notification) => notification,
107                    None => break,
108                };
109
110                match &notification {
111                    ExExNotification::ChainCommitted { new } => {
112                        info!(committed_chain = ?new.range(), "Received commit");
113                        let execution_outcome = new.execution_outcome();
114
115                        for (address, senders) in subscriptions.iter_mut() {
116                            for change in &execution_outcome.bundle.state {
117                                if change.0 == address {
118                                    for (key, slot) in &change.1.storage {
119                                        let diff = StorageDiff {
120                                            address: *change.0,
121                                            key: *key,
122                                            old_value: slot.original_value(),
123                                            new_value: slot.present_value(),
124                                        };
125                                        // Send diff to all the active subscribers
126                                        senders.retain(|sender| sender.send(diff).is_ok());
127                                    }
128                                }
129                            }
130                        }
131                    }
132                    ExExNotification::ChainReorged { old, new } => {
133                        info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
134                    }
135                    ExExNotification::ChainReverted { old } => {
136                        info!(reverted_chain = ?old.range(), "Received revert");
137                    }
138                }
139
140                if let Some(committed_chain) = notification.committed_chain() {
141                    ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
142                }
143            }
144
145            maybe_subscription = subscription_requests.recv() => {
146                match maybe_subscription {
147                    Some(SubscriptionRequest { address, response }) => {
148                        let (tx, rx) = mpsc::unbounded_channel();
149                        subscriptions.entry(address).or_default().push(tx);
150                        let _ = response.send(rx);
151                    }
152                    None => {
153                        // channel closed
154                         }
155                }
156            }
157        }
158    }
159
160    Ok(())
161}
162
163fn main() -> eyre::Result<()> {
164    reth_ethereum::cli::Cli::parse_args().run(|builder, _| async move {
165        let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
166        let rpc = StorageWatcherRpc::new(subscriptions_tx);
167
168        let handle: NodeHandleFor<EthereumNode> = builder
169            .node(EthereumNode::default())
170            .extend_rpc_modules(move |ctx| {
171                ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
172                Ok(())
173            })
174            .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
175            .launch()
176            .await?;
177
178        handle.wait_for_node_exit().await
179    })
180}