exex_subscription/
main.rs

1#![allow(dead_code)]
2
3//! An ExEx example that installs a new RPC subscription endpoint that emits storage changes for a
4//! requested address.
5#[allow(dead_code)]
6use alloy_primitives::{Address, U256};
7use futures::TryStreamExt;
8use jsonrpsee::{
9    core::SubscriptionResult, proc_macros::rpc, tracing, PendingSubscriptionSink,
10    SubscriptionMessage,
11};
12use reth_ethereum::{
13    exex::{ExExContext, ExExEvent, ExExNotification},
14    node::{api::FullNodeComponents, builder::NodeHandleFor, EthereumNode},
15};
16use std::collections::HashMap;
17use tokio::sync::{mpsc, oneshot};
18use tracing::{error, info};
19
20/// Subscription update format for storage changes.
21/// This is the format that will be sent to the client when a storage change occurs.
22#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
23struct StorageDiff {
24    address: Address,
25    key: U256,
26    old_value: U256,
27    new_value: U256,
28}
29
30/// Subscription request format for storage changes.
31struct SubscriptionRequest {
32    /// The address to subscribe to.
33    address: Address,
34    /// The response channel to send the subscription updates to.
35    response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
36}
37
38/// Subscription request format for storage changes.
39type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
40
41/// API to subscribe to storage changes for a specific Ethereum address.
42#[rpc(server, namespace = "watcher")]
43pub trait StorageWatcherApi {
44    /// Subscribes to storage changes for a given Ethereum address and streams `StorageDiff`
45    /// updates.
46    #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
47    fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
48}
49
50/// API implementation for the storage watcher.
51#[derive(Clone)]
52struct StorageWatcherRpc {
53    /// The subscription sender to send subscription requests to.
54    subscriptions: SubscriptionSender,
55}
56
57impl StorageWatcherRpc {
58    /// Creates a new [`StorageWatcherRpc`] instance with the given subscription sender.
59    fn new(subscriptions: SubscriptionSender) -> Self {
60        Self { subscriptions }
61    }
62}
63
64impl StorageWatcherApiServer for StorageWatcherRpc {
65    fn subscribe_storage_changes(
66        &self,
67        pending: PendingSubscriptionSink,
68        address: Address,
69    ) -> SubscriptionResult {
70        let subscription = self.subscriptions.clone();
71
72        tokio::spawn(async move {
73            let sink = match pending.accept().await {
74                Ok(sink) => sink,
75                Err(e) => {
76                    error!("failed to accept subscription: {e}");
77                    return;
78                }
79            };
80
81            let (resp_tx, resp_rx) = oneshot::channel();
82            subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
83
84            let Ok(mut rx) = resp_rx.await else { return };
85
86            while let Some(diff) = rx.recv().await {
87                let msg = SubscriptionMessage::from(
88                    serde_json::value::to_raw_value(&diff).expect("serialize"),
89                );
90                if sink.send(msg).await.is_err() {
91                    break;
92                }
93            }
94        });
95
96        Ok(())
97    }
98}
99
100async fn my_exex<Node: FullNodeComponents>(
101    mut ctx: ExExContext<Node>,
102    mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
103) -> eyre::Result<()> {
104    let mut subscriptions: HashMap<Address, Vec<mpsc::UnboundedSender<StorageDiff>>> =
105        HashMap::new();
106
107    loop {
108        tokio::select! {
109            maybe_notification = ctx.notifications.try_next() => {
110                let notification = match maybe_notification? {
111                    Some(notification) => notification,
112                    None => break,
113                };
114
115                match &notification {
116                    ExExNotification::ChainCommitted { new } => {
117                        info!(committed_chain = ?new.range(), "Received commit");
118                        let execution_outcome = new.execution_outcome();
119
120                        for (address, senders) in subscriptions.iter_mut() {
121                            for change in &execution_outcome.bundle.state {
122                                if change.0 == address {
123                                    for (key, slot) in &change.1.storage {
124                                        let diff = StorageDiff {
125                                            address: *change.0,
126                                            key: *key,
127                                            old_value: slot.original_value(),
128                                            new_value: slot.present_value(),
129                                        };
130                                        // Send diff to all the active subscribers
131                                        senders.retain(|sender| sender.send(diff).is_ok());
132                                    }
133                                }
134                            }
135                        }
136                    }
137                    ExExNotification::ChainReorged { old, new } => {
138                        info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
139                    }
140                    ExExNotification::ChainReverted { old } => {
141                        info!(reverted_chain = ?old.range(), "Received revert");
142                    }
143                }
144
145                if let Some(committed_chain) = notification.committed_chain() {
146                    ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
147                }
148            }
149
150            maybe_subscription = subscription_requests.recv() => {
151                match maybe_subscription {
152                    Some(SubscriptionRequest { address, response }) => {
153                        let (tx, rx) = mpsc::unbounded_channel();
154                        subscriptions.entry(address).or_default().push(tx);
155                        let _ = response.send(rx);
156                    }
157                    None => {
158                        // channel closed
159                         }
160                }
161            }
162        }
163    }
164
165    Ok(())
166}
167
168fn main() -> eyre::Result<()> {
169    reth_ethereum::cli::Cli::parse_args().run(|builder, _| async move {
170        let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
171
172        let rpc = StorageWatcherRpc::new(subscriptions_tx.clone());
173
174        let handle: NodeHandleFor<EthereumNode> = builder
175            .node(EthereumNode::default())
176            .extend_rpc_modules(move |ctx| {
177                ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
178                Ok(())
179            })
180            .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
181            .launch()
182            .await?;
183
184        handle.wait_for_node_exit().await
185    })
186}