reth_rpc/
reth.rs

1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use alloy_eips::BlockId;
4use alloy_primitives::{Address, U256};
5use async_trait::async_trait;
6use jsonrpsee::core::RpcResult;
7use reth_errors::RethResult;
8use reth_provider::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
9use reth_rpc_api::RethApiServer;
10use reth_rpc_eth_types::{EthApiError, EthResult};
11use reth_tasks::TaskSpawner;
12use tokio::sync::oneshot;
13
14/// `reth` API implementation.
15///
16/// This type provides the functionality for handling `reth` prototype RPC requests.
17pub struct RethApi<Provider> {
18    inner: Arc<RethApiInner<Provider>>,
19}
20
21// === impl RethApi ===
22
23impl<Provider> RethApi<Provider> {
24    /// The provider that can interact with the chain.
25    pub fn provider(&self) -> &Provider {
26        &self.inner.provider
27    }
28
29    /// Create a new instance of the [`RethApi`]
30    pub fn new(provider: Provider, task_spawner: Box<dyn TaskSpawner>) -> Self {
31        let inner = Arc::new(RethApiInner { provider, task_spawner });
32        Self { inner }
33    }
34}
35
36impl<Provider> RethApi<Provider>
37where
38    Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
39{
40    /// Executes the future on a new blocking task.
41    async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
42    where
43        C: FnOnce(Self) -> F,
44        F: Future<Output = EthResult<R>> + Send + 'static,
45        R: Send + 'static,
46    {
47        let (tx, rx) = oneshot::channel();
48        let this = self.clone();
49        let f = c(this);
50        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
51            let res = f.await;
52            let _ = tx.send(res);
53        }));
54        rx.await.map_err(|_| EthApiError::InternalEthError)?
55    }
56
57    /// Returns a map of addresses to changed account balanced for a particular block.
58    pub async fn balance_changes_in_block(
59        &self,
60        block_id: BlockId,
61    ) -> EthResult<HashMap<Address, U256>> {
62        self.on_blocking_task(|this| async move { this.try_balance_changes_in_block(block_id) })
63            .await
64    }
65
66    fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<HashMap<Address, U256>> {
67        let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
68            return Err(EthApiError::HeaderNotFound(block_id))
69        };
70
71        let state = self.provider().state_by_block_id(block_id)?;
72        let accounts_before = self.provider().account_block_changeset(block_number)?;
73        let hash_map = accounts_before.iter().try_fold(
74            HashMap::default(),
75            |mut hash_map, account_before| -> RethResult<_> {
76                let current_balance = state.account_balance(&account_before.address)?;
77                let prev_balance = account_before.info.map(|info| info.balance);
78                if current_balance != prev_balance {
79                    hash_map.insert(account_before.address, current_balance.unwrap_or_default());
80                }
81                Ok(hash_map)
82            },
83        )?;
84        Ok(hash_map)
85    }
86}
87
88#[async_trait]
89impl<Provider> RethApiServer for RethApi<Provider>
90where
91    Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
92{
93    /// Handler for `reth_getBalanceChangesInBlock`
94    async fn reth_get_balance_changes_in_block(
95        &self,
96        block_id: BlockId,
97    ) -> RpcResult<HashMap<Address, U256>> {
98        Ok(Self::balance_changes_in_block(self, block_id).await?)
99    }
100}
101
102impl<Provider> std::fmt::Debug for RethApi<Provider> {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        f.debug_struct("RethApi").finish_non_exhaustive()
105    }
106}
107
108impl<Provider> Clone for RethApi<Provider> {
109    fn clone(&self) -> Self {
110        Self { inner: Arc::clone(&self.inner) }
111    }
112}
113
114struct RethApiInner<Provider> {
115    /// The provider that can interact with the chain.
116    provider: Provider,
117    /// The type that can spawn tasks which would otherwise block.
118    task_spawner: Box<dyn TaskSpawner>,
119}