1use std::{future::Future, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockId;
5use alloy_primitives::{map::AddressMap, U256, U64};
6use async_trait::async_trait;
7use futures::{Stream, StreamExt};
8use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
9use reth_chain_state::{
10 CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
11 PersistedBlockSubscriptions,
12};
13use reth_errors::RethResult;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_execution_types::ExecutionOutcome;
16use reth_primitives_traits::{NodePrimitives, SealedHeader};
17use reth_rpc_api::RethApiServer;
18use reth_rpc_eth_types::{EthApiError, EthResult};
19use reth_storage_api::{
20 BlockReader, BlockReaderIdExt, ChangeSetReader, StateProviderFactory, TransactionVariant,
21};
22use reth_tasks::{pool::BlockingTaskGuard, Runtime};
23use serde::Serialize;
24use tokio::sync::oneshot;
25
26pub struct RethApi<Provider, EvmConfig> {
30 inner: Arc<RethApiInner<Provider, EvmConfig>>,
31}
32
33impl<Provider, EvmConfig> RethApi<Provider, EvmConfig> {
36 pub fn provider(&self) -> &Provider {
38 &self.inner.provider
39 }
40
41 pub fn evm_config(&self) -> &EvmConfig {
43 &self.inner.evm_config
44 }
45
46 pub fn new(
48 provider: Provider,
49 evm_config: EvmConfig,
50 blocking_task_guard: BlockingTaskGuard,
51 task_spawner: Runtime,
52 ) -> Self {
53 let inner =
54 Arc::new(RethApiInner { provider, evm_config, blocking_task_guard, task_spawner });
55 Self { inner }
56 }
57}
58
59impl<Provider, EvmConfig> RethApi<Provider, EvmConfig>
60where
61 Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
62 EvmConfig: Send + Sync + 'static,
63{
64 async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
66 where
67 C: FnOnce(Self) -> F,
68 F: Future<Output = EthResult<R>> + Send + 'static,
69 R: Send + 'static,
70 {
71 let (tx, rx) = oneshot::channel();
72 let this = self.clone();
73 let f = c(this);
74 self.inner.task_spawner.spawn_blocking_task(async move {
75 let res = f.await;
76 let _ = tx.send(res);
77 });
78 rx.await.map_err(|_| EthApiError::InternalEthError)?
79 }
80
81 pub async fn balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
83 self.on_blocking_task(async move |this| this.try_balance_changes_in_block(block_id)).await
84 }
85
86 fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
87 let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
88 return Err(EthApiError::HeaderNotFound(block_id))
89 };
90
91 let state = self.provider().state_by_block_id(block_id)?;
92 let accounts_before = self.provider().account_block_changeset(block_number)?;
93 let hash_map = accounts_before.iter().try_fold(
94 AddressMap::default(),
95 |mut hash_map, account_before| -> RethResult<_> {
96 let current_balance = state.account_balance(&account_before.address)?;
97 let prev_balance = account_before.info.map(|info| info.balance);
98 if current_balance != prev_balance {
99 hash_map.insert(account_before.address, current_balance.unwrap_or_default());
100 }
101 Ok(hash_map)
102 },
103 )?;
104 Ok(hash_map)
105 }
106}
107
108impl<N, Provider, EvmConfig> RethApi<Provider, EvmConfig>
109where
110 N: NodePrimitives,
111 Provider: BlockReaderIdExt
112 + ChangeSetReader
113 + StateProviderFactory
114 + BlockReader<Block = N::Block>
115 + CanonStateSubscriptions<Primitives = N>
116 + 'static,
117 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
118{
119 pub async fn block_execution_outcome(
121 &self,
122 block_id: BlockId,
123 count: Option<U64>,
124 ) -> EthResult<Option<ExecutionOutcome<N::Receipt>>> {
125 const MAX_BLOCK_COUNT: u64 = 128;
126
127 let block_count = count.map(|c| c.to::<u64>()).unwrap_or(1);
128 if block_count == 0 || block_count > MAX_BLOCK_COUNT {
129 return Err(EthApiError::InvalidParams(format!(
130 "block count must be between 1 and {MAX_BLOCK_COUNT}, got {block_count}"
131 )))
132 }
133
134 let permit = self
135 .inner
136 .blocking_task_guard
137 .clone()
138 .acquire_owned()
139 .await
140 .map_err(|_| EthApiError::InternalEthError)?;
141 self.on_blocking_task(async move |this| {
142 let _permit = permit;
143 this.try_block_execution_outcome(block_id, block_count)
144 })
145 .await
146 }
147
148 fn try_block_execution_outcome(
149 &self,
150 block_id: BlockId,
151 block_count: u64,
152 ) -> EthResult<Option<ExecutionOutcome<N::Receipt>>> {
153 let Some(start_block) = self.provider().block_number_for_id(block_id)? else {
154 return Ok(None)
155 };
156
157 if start_block == 0 {
158 return Ok(Some(ExecutionOutcome::default()))
159 }
160
161 let state_provider = self.provider().history_by_block_number(start_block - 1)?;
162 let db = reth_revm::database::StateProviderDatabase::new(&state_provider);
163
164 let mut blocks = Vec::with_capacity(block_count as usize);
165 for block_number in start_block..start_block + block_count {
166 let Some(block) = self
167 .provider()
168 .recovered_block(block_number.into(), TransactionVariant::WithHash)?
169 else {
170 if block_number == start_block {
171 return Ok(None)
172 }
173 break;
174 };
175 blocks.push(block);
176 }
177
178 let outcome = self.evm_config().executor(db).execute_batch(&blocks).map_err(
179 |e: reth_evm::execute::BlockExecutionError| {
180 EthApiError::Internal(reth_errors::RethError::Other(e.into()))
181 },
182 )?;
183
184 Ok(Some(outcome))
185 }
186}
187
188#[async_trait]
189impl<Provider, EvmConfig> RethApiServer for RethApi<Provider, EvmConfig>
190where
191 Provider: BlockReaderIdExt
192 + ChangeSetReader
193 + StateProviderFactory
194 + BlockReader<Block = <Provider::Primitives as NodePrimitives>::Block>
195 + CanonStateSubscriptions
196 + ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
197 + PersistedBlockSubscriptions
198 + 'static,
199 EvmConfig: ConfigureEvm<Primitives = Provider::Primitives> + 'static,
200{
201 async fn reth_get_balance_changes_in_block(
203 &self,
204 block_id: BlockId,
205 ) -> RpcResult<AddressMap<U256>> {
206 Ok(Self::balance_changes_in_block(self, block_id).await?)
207 }
208
209 async fn reth_get_block_execution_outcome(
211 &self,
212 block_id: BlockId,
213 count: Option<U64>,
214 ) -> RpcResult<Option<serde_json::Value>> {
215 let outcome = Self::block_execution_outcome(self, block_id, count).await?;
216 match outcome {
217 Some(outcome) => {
218 let value = serde_json::to_value(&outcome).map_err(|e| {
219 EthApiError::Internal(reth_errors::RethError::msg(e.to_string()))
220 })?;
221 Ok(Some(value))
222 }
223 None => Ok(None),
224 }
225 }
226
227 async fn reth_subscribe_chain_notifications(
229 &self,
230 pending: PendingSubscriptionSink,
231 ) -> jsonrpsee::core::SubscriptionResult {
232 let sink = pending.accept().await?;
233 let stream = self.provider().canonical_state_stream();
234 self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream));
235
236 Ok(())
237 }
238
239 async fn reth_subscribe_persisted_block(
241 &self,
242 pending: PendingSubscriptionSink,
243 ) -> jsonrpsee::core::SubscriptionResult {
244 let sink = pending.accept().await?;
245 let stream = self.provider().persisted_block_stream();
246 self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream));
247
248 Ok(())
249 }
250
251 async fn reth_subscribe_finalized_chain_notifications(
253 &self,
254 pending: PendingSubscriptionSink,
255 ) -> jsonrpsee::core::SubscriptionResult {
256 let sink = pending.accept().await?;
257 let canon_stream = self.provider().canonical_state_stream();
258 let finalized_stream = self.provider().finalized_block_stream();
259 self.inner.task_spawner.spawn_task(finalized_chain_notifications(
260 sink,
261 canon_stream,
262 finalized_stream,
263 ));
264
265 Ok(())
266 }
267}
268
269async fn pipe_from_stream<S, T>(sink: SubscriptionSink, mut stream: S)
271where
272 S: Stream<Item = T> + Unpin,
273 T: Serialize,
274{
275 loop {
276 tokio::select! {
277 _ = sink.closed() => {
278 break
279 }
280 maybe_item = stream.next() => {
281 let Some(item) = maybe_item else {
282 break
283 };
284 let msg = match SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) {
285 Ok(msg) => msg,
286 Err(err) => {
287 tracing::error!(target: "rpc::reth", %err, "Failed to serialize subscription message");
288 break
289 }
290 };
291 if sink.send(msg).await.is_err() {
292 break;
293 }
294 }
295 }
296 }
297}
298
299async fn finalized_chain_notifications<N>(
301 sink: SubscriptionSink,
302 mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
303 mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
304) where
305 N: NodePrimitives,
306{
307 let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();
308
309 loop {
310 tokio::select! {
311 _ = sink.closed() => {
312 break
313 }
314 maybe_canon = canon_stream.next() => {
315 let Some(notification) = maybe_canon else { break };
316 match ¬ification {
317 CanonStateNotification::Commit { .. } => {
318 buffered.push(notification);
319 }
320 CanonStateNotification::Reorg { .. } => {
321 buffered.clear();
322 }
323 }
324 }
325 maybe_finalized = finalized_stream.next() => {
326 let Some(finalized_header) = maybe_finalized else { break };
327 let finalized_num = finalized_header.number();
328
329 let mut committed = Vec::new();
330 buffered.retain(|n| {
331 if *n.committed().range().end() <= finalized_num {
332 committed.push(n.clone());
333 false
334 } else {
335 true
336 }
337 });
338
339 if committed.is_empty() {
340 continue;
341 }
342
343 committed.sort_by_key(|n| *n.committed().range().start());
344
345 let msg = match SubscriptionMessage::new(
346 sink.method_name(),
347 sink.subscription_id(),
348 &committed,
349 ) {
350 Ok(msg) => msg,
351 Err(err) => {
352 tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
353 break
354 }
355 };
356 if sink.send(msg).await.is_err() {
357 break;
358 }
359 }
360 }
361 }
362}
363
364impl<Provider, EvmConfig> std::fmt::Debug for RethApi<Provider, EvmConfig> {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 f.debug_struct("RethApi").finish_non_exhaustive()
367 }
368}
369
370impl<Provider, EvmConfig> Clone for RethApi<Provider, EvmConfig> {
371 fn clone(&self) -> Self {
372 Self { inner: Arc::clone(&self.inner) }
373 }
374}
375
376struct RethApiInner<Provider, EvmConfig> {
377 provider: Provider,
379 evm_config: EvmConfig,
381 blocking_task_guard: BlockingTaskGuard,
383 task_spawner: Runtime,
385}