exex_subscription/
main.rs

1#![allow(dead_code)]
2
3//! An ExEx example that installs a new RPC subscription endpoint that emit storage changes for a
4//! requested address.
5#[allow(dead_code)]
6use alloy_primitives::{Address, U256};
7use clap::Parser;
8use futures::TryStreamExt;
9use jsonrpsee::{
10    core::SubscriptionResult, proc_macros::rpc, tracing, PendingSubscriptionSink,
11    SubscriptionMessage,
12};
13use reth_ethereum::{
14    exex::{ExExContext, ExExEvent, ExExNotification},
15    node::{api::FullNodeComponents, EthereumNode},
16};
17use std::collections::HashMap;
18use tokio::sync::{mpsc, oneshot};
19use tracing::{error, info};
20
21/// Subscription update format for storage changes.
22/// This is the format that will be sent to the client when a storage change occurs.
23#[derive(Debug, Clone, Copy, Default, serde::Serialize)]
24struct StorageDiff {
25    address: Address,
26    key: U256,
27    old_value: U256,
28    new_value: U256,
29}
30
31/// Subscription request format for storage changes.
32struct SubscriptionRequest {
33    /// The address to subscribe to.
34    address: Address,
35    /// The response channel to send the subscription updates to.
36    response: oneshot::Sender<mpsc::UnboundedReceiver<StorageDiff>>,
37}
38
39/// Subscription request format for storage changes.
40type SubscriptionSender = mpsc::UnboundedSender<SubscriptionRequest>;
41
42/// API to subscribe to storage changes for a specific Ethereum address.
43#[rpc(server, namespace = "watcher")]
44pub trait StorageWatcherApi {
45    /// Subscribes to storage changes for a given Ethereum address and streams `StorageDiff`
46    /// updates.
47    #[subscription(name = "subscribeStorageChanges", item = StorageDiff)]
48    fn subscribe_storage_changes(&self, address: Address) -> SubscriptionResult;
49}
50
51/// API implementation for the storage watcher.
52#[derive(Clone)]
53struct StorageWatcherRpc {
54    /// The subscription sender to send subscription requests to.
55    subscriptions: SubscriptionSender,
56}
57
58impl StorageWatcherRpc {
59    /// Creates a new [`StorageWatcherRpc`] instance with the given subscription sender.
60    fn new(subscriptions: SubscriptionSender) -> Self {
61        Self { subscriptions }
62    }
63}
64
65impl StorageWatcherApiServer for StorageWatcherRpc {
66    fn subscribe_storage_changes(
67        &self,
68        pending: PendingSubscriptionSink,
69        address: Address,
70    ) -> SubscriptionResult {
71        let subscription = self.subscriptions.clone();
72
73        tokio::spawn(async move {
74            let sink = match pending.accept().await {
75                Ok(sink) => sink,
76                Err(e) => {
77                    error!("failed to accept subscription: {e}");
78                    return;
79                }
80            };
81
82            let (resp_tx, resp_rx) = oneshot::channel();
83            subscription.send(SubscriptionRequest { address, response: resp_tx }).unwrap();
84
85            let Ok(mut rx) = resp_rx.await else { return };
86
87            while let Some(diff) = rx.recv().await {
88                let msg = SubscriptionMessage::from(
89                    serde_json::value::to_raw_value(&diff).expect("serialize"),
90                );
91                if sink.send(msg).await.is_err() {
92                    break;
93                }
94            }
95        });
96
97        Ok(())
98    }
99}
100
101async fn my_exex<Node: FullNodeComponents>(
102    mut ctx: ExExContext<Node>,
103    mut subscription_requests: mpsc::UnboundedReceiver<SubscriptionRequest>,
104) -> eyre::Result<()> {
105    let mut subscriptions: HashMap<Address, Vec<mpsc::UnboundedSender<StorageDiff>>> =
106        HashMap::new();
107
108    loop {
109        tokio::select! {
110            maybe_notification = ctx.notifications.try_next() => {
111                let notification = match maybe_notification? {
112                    Some(notification) => notification,
113                    None => break,
114                };
115
116                match &notification {
117                    ExExNotification::ChainCommitted { new } => {
118                        info!(committed_chain = ?new.range(), "Received commit");
119                        let execution_outcome = new.execution_outcome();
120
121                        for (address, senders) in subscriptions.iter_mut() {
122                            for change in &execution_outcome.bundle.state {
123                                if change.0 == address {
124                                    for (key, slot) in &change.1.storage {
125                                        let diff = StorageDiff {
126                                            address: *change.0,
127                                            key: *key,
128                                            old_value: slot.original_value(),
129                                            new_value: slot.present_value(),
130                                        };
131                                        // Send diff to all the active subscribers
132                                        senders.retain(|sender| sender.send(diff).is_ok());
133                                    }
134                                }
135                            }
136                        }
137                    }
138                    ExExNotification::ChainReorged { old, new } => {
139                        info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
140                    }
141                    ExExNotification::ChainReverted { old } => {
142                        info!(reverted_chain = ?old.range(), "Received revert");
143                    }
144                }
145
146                if let Some(committed_chain) = notification.committed_chain() {
147                    ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))?;
148                }
149            }
150
151            maybe_subscription = subscription_requests.recv() => {
152                match maybe_subscription {
153                    Some(SubscriptionRequest { address, response }) => {
154                        let (tx, rx) = mpsc::unbounded_channel();
155                        subscriptions.entry(address).or_default().push(tx);
156                        let _ = response.send(rx);
157                    }
158                    None => {
159                        // channel closed
160                         }
161                }
162            }
163        }
164    }
165
166    Ok(())
167}
168
169#[derive(Parser, Debug)]
170struct Args {
171    #[arg(long)]
172    enable_ext: bool,
173}
174
175fn main() -> eyre::Result<()> {
176    reth_ethereum::cli::Cli::parse_args().run(|builder, _args| async move {
177        let (subscriptions_tx, subscriptions_rx) = mpsc::unbounded_channel::<SubscriptionRequest>();
178
179        let rpc = StorageWatcherRpc::new(subscriptions_tx.clone());
180
181        let handle = builder
182            .node(EthereumNode::default())
183            .extend_rpc_modules(move |ctx| {
184                ctx.modules.merge_configured(StorageWatcherApiServer::into_rpc(rpc))?;
185                Ok(())
186            })
187            .install_exex("my-exex", async move |ctx| Ok(my_exex(ctx, subscriptions_rx)))
188            .launch()
189            .await?;
190
191        handle.wait_for_node_exit().await
192    })
193}