exex_subscription/
main.rs

1//! An ExEx example that installs a new RPC subscription endpoint that emits storage changes for a
2//! requested address.
3use alloy_primitives::{Address, U256};
4use futures::TryStreamExt;
5use jsonrpsee::{
6    core::SubscriptionResult, proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage,
7};
8use reth_ethereum::{
9    exex::{ExExContext, ExExEvent, ExExNotification},
10    node::{api::FullNodeComponents, builder::NodeHandleFor, EthereumNode},
11};
12use std::collections::HashMap;
13use tokio::sync::{mpsc, oneshot};
14use tracing::{error, info};
15
16/// Subscription update format for storage changes.
17/// This is the format that will be sent to the client when a storage change occurs.
18#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
19struct StorageDiff {
20    address: Address,
21    key: U256,
22    old_value: U256,
23    new_value: U256,
24}
25
26/// Subscription request format for storage changes.
27struct SubscriptionRequest {
28    /// The address to subscribe to.
29    address: Address,
30    /// The response channel to send the subscription updates to.
31    response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
32}
33
34/// Subscription request format for storage changes.
35type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
36
37/// API to subscribe to storage changes for a specific Ethereum address.
38#[rpc(server, namespace = "watcher")]
39pub trait StorageWatcherApi {
40    /// Subscribes to storage changes for a given Ethereum address and streams `StorageDiff`
41    /// updates.
42    #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
43    fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
44}
45
46/// API implementation for the storage watcher.
47#[derive(Clone)]
48struct StorageWatcherRpc {
49    /// The subscription sender to send subscription requests to.
50    subscriptions: SubscriptionSender,
51}
52
53impl StorageWatcherRpc {
54    /// Creates a new [`StorageWatcherRpc`] instance with the given subscription sender.
55    fn new(subscriptions: SubscriptionSender) -> Self {
56        Self { subscriptions }
57    }
58}
59
60impl StorageWatcherApiServer for StorageWatcherRpc {
61    fn subscribe_storage_changes(
62        &self,
63        pending: PendingSubscriptionSink,
64        address: Address,
65    ) -> SubscriptionResult {
66        let subscription = self.subscriptions.clone();
67
68        tokio::spawn(async move {
69            let sink = match pending.accept().await {
70                Ok(sink) => sink,
71                Err(e) => {
72                    error!("failed to accept subscription: {e}");
73                    return;
74                }
75            };
76
77            let (resp_tx, resp_rx) = oneshot::channel();
78            subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
79
80            let Ok(mut rx) = resp_rx.await else { return };
81
82            while let Some(diff) = rx.recv().await {
83                let msg = SubscriptionMessage::from(
84                    serde_json::value::to_raw_value(&diff).expect("serialize"),
85                );
86                if sink.send(msg).await.is_err() {
87                    break;
88                }
89            }
90        });
91
92        Ok(())
93    }
94}
95
96async fn my_exex<Node: FullNodeComponents>(
97    mut ctx: ExExContext<Node>,
98    mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
99) -> eyre::Result<()> {
100    let mut subscriptions: HashMap<Address, Vec<mpsc::UnboundedSender<StorageDiff>>> =
101        HashMap::new();
102
103    loop {
104        tokio::select! {
105            maybe_notification = ctx.notifications.try_next() => {
106                let notification = match maybe_notification? {
107                    Some(notification) => notification,
108                    None => break,
109                };
110
111                match &notification {
112                    ExExNotification::ChainCommitted { new } => {
113                        info!(committed_chain = ?new.range(), "Received commit");
114                        let execution_outcome = new.execution_outcome();
115
116                        for (address, senders) in subscriptions.iter_mut() {
117                            for change in &execution_outcome.bundle.state {
118                                if change.0 == address {
119                                    for (key, slot) in &change.1.storage {
120                                        let diff = StorageDiff {
121                                            address: *change.0,
122                                            key: *key,
123                                            old_value: slot.original_value(),
124                                            new_value: slot.present_value(),
125                                        };
126                                        // Send diff to all the active subscribers
127                                        senders.retain(|sender| sender.send(diff).is_ok());
128                                    }
129                                }
130                            }
131                        }
132                    }
133                    ExExNotification::ChainReorged { old, new } => {
134                        info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
135                    }
136                    ExExNotification::ChainReverted { old } => {
137                        info!(reverted_chain = ?old.range(), "Received revert");
138                    }
139                }
140
141                if let Some(committed_chain) = notification.committed_chain() {
142                    ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
143                }
144            }
145
146            maybe_subscription = subscription_requests.recv() => {
147                match maybe_subscription {
148                    Some(SubscriptionRequest { address, response }) => {
149                        let (tx, rx) = mpsc::unbounded_channel();
150                        subscriptions.entry(address).or_default().push(tx);
151                        let _ = response.send(rx);
152                    }
153                    None => {
154                        // channel closed
155                         }
156                }
157            }
158        }
159    }
160
161    Ok(())
162}
163
164fn main() -> eyre::Result<()> {
165    reth_ethereum::cli::Cli::parse_args().run(|builder, _| async move {
166        let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
167        let rpc = StorageWatcherRpc::new(subscriptions_tx);
168
169        let handle: NodeHandleFor<EthereumNode> = builder
170            .node(EthereumNode::default())
171            .extend_rpc_modules(move |ctx| {
172                ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
173                Ok(())
174            })
175            .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
176            .launch()
177            .await?;
178
179        handle.wait_for_node_exit().await
180    })
181}