Skip to main content

reth_rpc/
reth.rs

1use std::{future::Future, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockId;
5use alloy_primitives::{map::AddressMap, U256};
6use async_trait::async_trait;
7use futures::{Stream, StreamExt};
8use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
9use reth_chain_state::{
10    CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
11    PersistedBlockSubscriptions,
12};
13use reth_errors::RethResult;
14use reth_primitives_traits::{NodePrimitives, SealedHeader};
15use reth_rpc_api::RethApiServer;
16use reth_rpc_eth_types::{EthApiError, EthResult};
17use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
18use reth_tasks::TaskSpawner;
19use serde::Serialize;
20use tokio::sync::oneshot;
21
22/// `reth` API implementation.
23///
24/// This type provides the functionality for handling `reth` prototype RPC requests.
25pub struct RethApi<Provider> {
26    inner: Arc<RethApiInner<Provider>>,
27}
28
29// === impl RethApi ===
30
31impl<Provider> RethApi<Provider> {
32    /// The provider that can interact with the chain.
33    pub fn provider(&self) -> &Provider {
34        &self.inner.provider
35    }
36
37    /// Create a new instance of the [`RethApi`]
38    pub fn new(provider: Provider, task_spawner: Box<dyn TaskSpawner>) -> Self {
39        let inner = Arc::new(RethApiInner { provider, task_spawner });
40        Self { inner }
41    }
42}
43
44impl<Provider> RethApi<Provider>
45where
46    Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
47{
48    /// Executes the future on a new blocking task.
49    async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
50    where
51        C: FnOnce(Self) -> F,
52        F: Future<Output = EthResult<R>> + Send + 'static,
53        R: Send + 'static,
54    {
55        let (tx, rx) = oneshot::channel();
56        let this = self.clone();
57        let f = c(this);
58        self.inner.task_spawner.spawn_blocking_task(Box::pin(async move {
59            let res = f.await;
60            let _ = tx.send(res);
61        }));
62        rx.await.map_err(|_| EthApiError::InternalEthError)?
63    }
64
65    /// Returns a map of addresses to changed account balanced for a particular block.
66    pub async fn balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
67        self.on_blocking_task(|this| async move { this.try_balance_changes_in_block(block_id) })
68            .await
69    }
70
71    fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
72        let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
73            return Err(EthApiError::HeaderNotFound(block_id))
74        };
75
76        let state = self.provider().state_by_block_id(block_id)?;
77        let accounts_before = self.provider().account_block_changeset(block_number)?;
78        let hash_map = accounts_before.iter().try_fold(
79            AddressMap::default(),
80            |mut hash_map, account_before| -> RethResult<_> {
81                let current_balance = state.account_balance(&account_before.address)?;
82                let prev_balance = account_before.info.map(|info| info.balance);
83                if current_balance != prev_balance {
84                    hash_map.insert(account_before.address, current_balance.unwrap_or_default());
85                }
86                Ok(hash_map)
87            },
88        )?;
89        Ok(hash_map)
90    }
91}
92
93#[async_trait]
94impl<Provider> RethApiServer for RethApi<Provider>
95where
96    Provider: BlockReaderIdExt
97        + ChangeSetReader
98        + StateProviderFactory
99        + CanonStateSubscriptions
100        + ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
101        + PersistedBlockSubscriptions
102        + 'static,
103{
104    /// Handler for `reth_getBalanceChangesInBlock`
105    async fn reth_get_balance_changes_in_block(
106        &self,
107        block_id: BlockId,
108    ) -> RpcResult<AddressMap<U256>> {
109        Ok(Self::balance_changes_in_block(self, block_id).await?)
110    }
111
112    /// Handler for `reth_subscribeChainNotifications`
113    async fn reth_subscribe_chain_notifications(
114        &self,
115        pending: PendingSubscriptionSink,
116    ) -> jsonrpsee::core::SubscriptionResult {
117        let sink = pending.accept().await?;
118        let stream = self.provider().canonical_state_stream();
119        self.inner.task_spawner.spawn_task(Box::pin(pipe_from_stream(sink, stream)));
120
121        Ok(())
122    }
123
124    /// Handler for `reth_subscribePersistedBlock`
125    async fn reth_subscribe_persisted_block(
126        &self,
127        pending: PendingSubscriptionSink,
128    ) -> jsonrpsee::core::SubscriptionResult {
129        let sink = pending.accept().await?;
130        let stream = self.provider().persisted_block_stream();
131        self.inner.task_spawner.spawn_task(Box::pin(pipe_from_stream(sink, stream)));
132
133        Ok(())
134    }
135
136    /// Handler for `reth_subscribeFinalizedChainNotifications`
137    async fn reth_subscribe_finalized_chain_notifications(
138        &self,
139        pending: PendingSubscriptionSink,
140    ) -> jsonrpsee::core::SubscriptionResult {
141        let sink = pending.accept().await?;
142        let canon_stream = self.provider().canonical_state_stream();
143        let finalized_stream = self.provider().finalized_block_stream();
144        self.inner.task_spawner.spawn_task(Box::pin(finalized_chain_notifications(
145            sink,
146            canon_stream,
147            finalized_stream,
148        )));
149
150        Ok(())
151    }
152}
153
154/// Pipes all stream items to the subscription sink.
155async fn pipe_from_stream<S, T>(sink: SubscriptionSink, mut stream: S)
156where
157    S: Stream<Item = T> + Unpin,
158    T: Serialize,
159{
160    loop {
161        tokio::select! {
162            _ = sink.closed() => {
163                break
164            }
165            maybe_item = stream.next() => {
166                let Some(item) = maybe_item else {
167                    break
168                };
169                let msg = match SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) {
170                    Ok(msg) => msg,
171                    Err(err) => {
172                        tracing::error!(target: "rpc::reth", %err, "Failed to serialize subscription message");
173                        break
174                    }
175                };
176                if sink.send(msg).await.is_err() {
177                    break;
178                }
179            }
180        }
181    }
182}
183
184/// Buffers committed chain notifications and emits them when a new finalized block is received.
185async fn finalized_chain_notifications<N>(
186    sink: SubscriptionSink,
187    mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
188    mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
189) where
190    N: NodePrimitives,
191{
192    let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();
193
194    loop {
195        tokio::select! {
196            _ = sink.closed() => {
197                break
198            }
199            maybe_canon = canon_stream.next() => {
200                let Some(notification) = maybe_canon else { break };
201                match &notification {
202                    CanonStateNotification::Commit { .. } => {
203                        buffered.push(notification);
204                    }
205                    CanonStateNotification::Reorg { .. } => {
206                        buffered.clear();
207                    }
208                }
209            }
210            maybe_finalized = finalized_stream.next() => {
211                let Some(finalized_header) = maybe_finalized else { break };
212                let finalized_num = finalized_header.number();
213
214                let mut committed = Vec::new();
215                buffered.retain(|n| {
216                    if *n.committed().range().end() <= finalized_num {
217                        committed.push(n.clone());
218                        false
219                    } else {
220                        true
221                    }
222                });
223
224                if committed.is_empty() {
225                    continue;
226                }
227
228                committed.sort_by_key(|n| *n.committed().range().start());
229
230                let msg = match SubscriptionMessage::new(
231                    sink.method_name(),
232                    sink.subscription_id(),
233                    &committed,
234                ) {
235                    Ok(msg) => msg,
236                    Err(err) => {
237                        tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
238                        break
239                    }
240                };
241                if sink.send(msg).await.is_err() {
242                    break;
243                }
244            }
245        }
246    }
247}
248
249impl<Provider> std::fmt::Debug for RethApi<Provider> {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        f.debug_struct("RethApi").finish_non_exhaustive()
252    }
253}
254
255impl<Provider> Clone for RethApi<Provider> {
256    fn clone(&self) -> Self {
257        Self { inner: Arc::clone(&self.inner) }
258    }
259}
260
261struct RethApiInner<Provider> {
262    /// The provider that can interact with the chain.
263    provider: Provider,
264    /// The type that can spawn tasks which would otherwise block.
265    task_spawner: Box<dyn TaskSpawner>,
266}