1use std::{collections::HashMap, future::Future, sync::Arc};
2
3use alloy_eips::BlockId;
4use alloy_primitives::{Address, U256};
5use async_trait::async_trait;
6use jsonrpsee::core::RpcResult;
7use reth_errors::RethResult;
8use reth_provider::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
9use reth_rpc_api::RethApiServer;
10use reth_rpc_eth_types::{EthApiError, EthResult};
11use reth_tasks::TaskSpawner;
12use tokio::sync::oneshot;
13
14pub struct RethApi<Provider> {
18 inner: Arc<RethApiInner<Provider>>,
19}
20
21impl<Provider> RethApi<Provider> {
24 pub fn provider(&self) -> &Provider {
26 &self.inner.provider
27 }
28
29 pub fn new(provider: Provider, task_spawner: Box<dyn TaskSpawner>) -> Self {
31 let inner = Arc::new(RethApiInner { provider, task_spawner });
32 Self { inner }
33 }
34}
35
36impl<Provider> RethApi<Provider>
37where
38 Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
39{
40 async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
42 where
43 C: FnOnce(Self) -> F,
44 F: Future<Output = EthResult<R>> + Send + 'static,
45 R: Send + 'static,
46 {
47 let (tx, rx) = oneshot::channel();
48 let this = self.clone();
49 let f = c(this);
50 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
51 let res = f.await;
52 let _ = tx.send(res);
53 }));
54 rx.await.map_err(|_| EthApiError::InternalEthError)?
55 }
56
57 pub async fn balance_changes_in_block(
59 &self,
60 block_id: BlockId,
61 ) -> EthResult<HashMap<Address, U256>> {
62 self.on_blocking_task(|this| async move { this.try_balance_changes_in_block(block_id) })
63 .await
64 }
65
66 fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<HashMap<Address, U256>> {
67 let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
68 return Err(EthApiError::HeaderNotFound(block_id))
69 };
70
71 let state = self.provider().state_by_block_id(block_id)?;
72 let accounts_before = self.provider().account_block_changeset(block_number)?;
73 let hash_map = accounts_before.iter().try_fold(
74 HashMap::default(),
75 |mut hash_map, account_before| -> RethResult<_> {
76 let current_balance = state.account_balance(&account_before.address)?;
77 let prev_balance = account_before.info.map(|info| info.balance);
78 if current_balance != prev_balance {
79 hash_map.insert(account_before.address, current_balance.unwrap_or_default());
80 }
81 Ok(hash_map)
82 },
83 )?;
84 Ok(hash_map)
85 }
86}
87
88#[async_trait]
89impl<Provider> RethApiServer for RethApi<Provider>
90where
91 Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
92{
93 async fn reth_get_balance_changes_in_block(
95 &self,
96 block_id: BlockId,
97 ) -> RpcResult<HashMap<Address, U256>> {
98 Ok(Self::balance_changes_in_block(self, block_id).await?)
99 }
100}
101
102impl<Provider> std::fmt::Debug for RethApi<Provider> {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 f.debug_struct("RethApi").finish_non_exhaustive()
105 }
106}
107
108impl<Provider> Clone for RethApi<Provider> {
109 fn clone(&self) -> Self {
110 Self { inner: Arc::clone(&self.inner) }
111 }
112}
113
114struct RethApiInner<Provider> {
115 provider: Provider,
117 task_spawner: Box<dyn TaskSpawner>,
119}