1use std::{future::Future, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockId;
5use alloy_primitives::{map::AddressMap, U256};
6use async_trait::async_trait;
7use futures::{Stream, StreamExt};
8use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
9use reth_chain_state::{
10 CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
11 PersistedBlockSubscriptions,
12};
13use reth_errors::RethResult;
14use reth_primitives_traits::{NodePrimitives, SealedHeader};
15use reth_rpc_api::RethApiServer;
16use reth_rpc_eth_types::{EthApiError, EthResult};
17use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
18use reth_tasks::TaskSpawner;
19use serde::Serialize;
20use tokio::sync::oneshot;
21
22pub struct RethApi<Provider> {
26 inner: Arc<RethApiInner<Provider>>,
27}
28
29impl<Provider> RethApi<Provider> {
32 pub fn provider(&self) -> &Provider {
34 &self.inner.provider
35 }
36
37 pub fn new(provider: Provider, task_spawner: Box<dyn TaskSpawner>) -> Self {
39 let inner = Arc::new(RethApiInner { provider, task_spawner });
40 Self { inner }
41 }
42}
43
44impl<Provider> RethApi<Provider>
45where
46 Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
47{
48 async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
50 where
51 C: FnOnce(Self) -> F,
52 F: Future<Output = EthResult<R>> + Send + 'static,
53 R: Send + 'static,
54 {
55 let (tx, rx) = oneshot::channel();
56 let this = self.clone();
57 let f = c(this);
58 self.inner.task_spawner.spawn_blocking_task(Box::pin(async move {
59 let res = f.await;
60 let _ = tx.send(res);
61 }));
62 rx.await.map_err(|_| EthApiError::InternalEthError)?
63 }
64
65 pub async fn balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
67 self.on_blocking_task(|this| async move { this.try_balance_changes_in_block(block_id) })
68 .await
69 }
70
71 fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
72 let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
73 return Err(EthApiError::HeaderNotFound(block_id))
74 };
75
76 let state = self.provider().state_by_block_id(block_id)?;
77 let accounts_before = self.provider().account_block_changeset(block_number)?;
78 let hash_map = accounts_before.iter().try_fold(
79 AddressMap::default(),
80 |mut hash_map, account_before| -> RethResult<_> {
81 let current_balance = state.account_balance(&account_before.address)?;
82 let prev_balance = account_before.info.map(|info| info.balance);
83 if current_balance != prev_balance {
84 hash_map.insert(account_before.address, current_balance.unwrap_or_default());
85 }
86 Ok(hash_map)
87 },
88 )?;
89 Ok(hash_map)
90 }
91}
92
93#[async_trait]
94impl<Provider> RethApiServer for RethApi<Provider>
95where
96 Provider: BlockReaderIdExt
97 + ChangeSetReader
98 + StateProviderFactory
99 + CanonStateSubscriptions
100 + ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
101 + PersistedBlockSubscriptions
102 + 'static,
103{
104 async fn reth_get_balance_changes_in_block(
106 &self,
107 block_id: BlockId,
108 ) -> RpcResult<AddressMap<U256>> {
109 Ok(Self::balance_changes_in_block(self, block_id).await?)
110 }
111
112 async fn reth_subscribe_chain_notifications(
114 &self,
115 pending: PendingSubscriptionSink,
116 ) -> jsonrpsee::core::SubscriptionResult {
117 let sink = pending.accept().await?;
118 let stream = self.provider().canonical_state_stream();
119 self.inner.task_spawner.spawn_task(Box::pin(pipe_from_stream(sink, stream)));
120
121 Ok(())
122 }
123
124 async fn reth_subscribe_persisted_block(
126 &self,
127 pending: PendingSubscriptionSink,
128 ) -> jsonrpsee::core::SubscriptionResult {
129 let sink = pending.accept().await?;
130 let stream = self.provider().persisted_block_stream();
131 self.inner.task_spawner.spawn_task(Box::pin(pipe_from_stream(sink, stream)));
132
133 Ok(())
134 }
135
136 async fn reth_subscribe_finalized_chain_notifications(
138 &self,
139 pending: PendingSubscriptionSink,
140 ) -> jsonrpsee::core::SubscriptionResult {
141 let sink = pending.accept().await?;
142 let canon_stream = self.provider().canonical_state_stream();
143 let finalized_stream = self.provider().finalized_block_stream();
144 self.inner.task_spawner.spawn_task(Box::pin(finalized_chain_notifications(
145 sink,
146 canon_stream,
147 finalized_stream,
148 )));
149
150 Ok(())
151 }
152}
153
154async fn pipe_from_stream<S, T>(sink: SubscriptionSink, mut stream: S)
156where
157 S: Stream<Item = T> + Unpin,
158 T: Serialize,
159{
160 loop {
161 tokio::select! {
162 _ = sink.closed() => {
163 break
164 }
165 maybe_item = stream.next() => {
166 let Some(item) = maybe_item else {
167 break
168 };
169 let msg = match SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) {
170 Ok(msg) => msg,
171 Err(err) => {
172 tracing::error!(target: "rpc::reth", %err, "Failed to serialize subscription message");
173 break
174 }
175 };
176 if sink.send(msg).await.is_err() {
177 break;
178 }
179 }
180 }
181 }
182}
183
184async fn finalized_chain_notifications<N>(
186 sink: SubscriptionSink,
187 mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
188 mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
189) where
190 N: NodePrimitives,
191{
192 let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();
193
194 loop {
195 tokio::select! {
196 _ = sink.closed() => {
197 break
198 }
199 maybe_canon = canon_stream.next() => {
200 let Some(notification) = maybe_canon else { break };
201 match ¬ification {
202 CanonStateNotification::Commit { .. } => {
203 buffered.push(notification);
204 }
205 CanonStateNotification::Reorg { .. } => {
206 buffered.clear();
207 }
208 }
209 }
210 maybe_finalized = finalized_stream.next() => {
211 let Some(finalized_header) = maybe_finalized else { break };
212 let finalized_num = finalized_header.number();
213
214 let mut committed = Vec::new();
215 buffered.retain(|n| {
216 if *n.committed().range().end() <= finalized_num {
217 committed.push(n.clone());
218 false
219 } else {
220 true
221 }
222 });
223
224 if committed.is_empty() {
225 continue;
226 }
227
228 committed.sort_by_key(|n| *n.committed().range().start());
229
230 let msg = match SubscriptionMessage::new(
231 sink.method_name(),
232 sink.subscription_id(),
233 &committed,
234 ) {
235 Ok(msg) => msg,
236 Err(err) => {
237 tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
238 break
239 }
240 };
241 if sink.send(msg).await.is_err() {
242 break;
243 }
244 }
245 }
246 }
247}
248
249impl<Provider> std::fmt::Debug for RethApi<Provider> {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 f.debug_struct("RethApi").finish_non_exhaustive()
252 }
253}
254
255impl<Provider> Clone for RethApi<Provider> {
256 fn clone(&self) -> Self {
257 Self { inner: Arc::clone(&self.inner) }
258 }
259}
260
261struct RethApiInner<Provider> {
262 provider: Provider,
264 task_spawner: Box<dyn TaskSpawner>,
266}