Skip to main content

reth_rpc/
reth.rs

1use std::{future::Future, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockId;
5use alloy_primitives::{map::AddressMap, U256, U64};
6use async_trait::async_trait;
7use futures::{Stream, StreamExt};
8use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
9use reth_chain_state::{
10    CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
11    PersistedBlockSubscriptions,
12};
13use reth_errors::RethResult;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_execution_types::ExecutionOutcome;
16use reth_primitives_traits::{NodePrimitives, SealedHeader};
17use reth_rpc_api::RethApiServer;
18use reth_rpc_eth_types::{EthApiError, EthResult};
19use reth_storage_api::{
20    BlockReader, BlockReaderIdExt, ChangeSetReader, StateProviderFactory, TransactionVariant,
21};
22use reth_tasks::{pool::BlockingTaskGuard, Runtime};
23use serde::Serialize;
24use tokio::sync::oneshot;
25
26/// `reth` API implementation.
27///
28/// This type provides the functionality for handling `reth` prototype RPC requests.
29pub struct RethApi<Provider, EvmConfig> {
30    inner: Arc<RethApiInner<Provider, EvmConfig>>,
31}
32
33// === impl RethApi ===
34
35impl<Provider, EvmConfig> RethApi<Provider, EvmConfig> {
36    /// The provider that can interact with the chain.
37    pub fn provider(&self) -> &Provider {
38        &self.inner.provider
39    }
40
41    /// The evm config.
42    pub fn evm_config(&self) -> &EvmConfig {
43        &self.inner.evm_config
44    }
45
46    /// Create a new instance of the [`RethApi`]
47    pub fn new(
48        provider: Provider,
49        evm_config: EvmConfig,
50        blocking_task_guard: BlockingTaskGuard,
51        task_spawner: Runtime,
52    ) -> Self {
53        let inner =
54            Arc::new(RethApiInner { provider, evm_config, blocking_task_guard, task_spawner });
55        Self { inner }
56    }
57}
58
59impl<Provider, EvmConfig> RethApi<Provider, EvmConfig>
60where
61    Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
62    EvmConfig: Send + Sync + 'static,
63{
64    /// Executes the future on a new blocking task.
65    async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
66    where
67        C: FnOnce(Self) -> F,
68        F: Future<Output = EthResult<R>> + Send + 'static,
69        R: Send + 'static,
70    {
71        let (tx, rx) = oneshot::channel();
72        let this = self.clone();
73        let f = c(this);
74        self.inner.task_spawner.spawn_blocking_task(async move {
75            let res = f.await;
76            let _ = tx.send(res);
77        });
78        rx.await.map_err(|_| EthApiError::InternalEthError)?
79    }
80
81    /// Returns a map of addresses to changed account balanced for a particular block.
82    pub async fn balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
83        self.on_blocking_task(async move |this| this.try_balance_changes_in_block(block_id)).await
84    }
85
86    fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
87        let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
88            return Err(EthApiError::HeaderNotFound(block_id))
89        };
90
91        let state = self.provider().state_by_block_id(block_id)?;
92        let accounts_before = self.provider().account_block_changeset(block_number)?;
93        let hash_map = accounts_before.iter().try_fold(
94            AddressMap::default(),
95            |mut hash_map, account_before| -> RethResult<_> {
96                let current_balance = state.account_balance(&account_before.address)?;
97                let prev_balance = account_before.info.map(|info| info.balance);
98                if current_balance != prev_balance {
99                    hash_map.insert(account_before.address, current_balance.unwrap_or_default());
100                }
101                Ok(hash_map)
102            },
103        )?;
104        Ok(hash_map)
105    }
106}
107
108impl<N, Provider, EvmConfig> RethApi<Provider, EvmConfig>
109where
110    N: NodePrimitives,
111    Provider: BlockReaderIdExt
112        + ChangeSetReader
113        + StateProviderFactory
114        + BlockReader<Block = N::Block>
115        + CanonStateSubscriptions<Primitives = N>
116        + 'static,
117    EvmConfig: ConfigureEvm<Primitives = N> + 'static,
118{
119    /// Re-executes one or more consecutive blocks and returns the execution outcome.
120    pub async fn block_execution_outcome(
121        &self,
122        block_id: BlockId,
123        count: Option<U64>,
124    ) -> EthResult<Option<ExecutionOutcome<N::Receipt>>> {
125        const MAX_BLOCK_COUNT: u64 = 128;
126
127        let block_count = count.map(|c| c.to::<u64>()).unwrap_or(1);
128        if block_count == 0 || block_count > MAX_BLOCK_COUNT {
129            return Err(EthApiError::InvalidParams(format!(
130                "block count must be between 1 and {MAX_BLOCK_COUNT}, got {block_count}"
131            )))
132        }
133
134        let permit = self
135            .inner
136            .blocking_task_guard
137            .clone()
138            .acquire_owned()
139            .await
140            .map_err(|_| EthApiError::InternalEthError)?;
141        self.on_blocking_task(async move |this| {
142            let _permit = permit;
143            this.try_block_execution_outcome(block_id, block_count)
144        })
145        .await
146    }
147
148    fn try_block_execution_outcome(
149        &self,
150        block_id: BlockId,
151        block_count: u64,
152    ) -> EthResult<Option<ExecutionOutcome<N::Receipt>>> {
153        let Some(start_block) = self.provider().block_number_for_id(block_id)? else {
154            return Ok(None)
155        };
156
157        if start_block == 0 {
158            return Ok(Some(ExecutionOutcome::default()))
159        }
160
161        let state_provider = self.provider().history_by_block_number(start_block - 1)?;
162        let db = reth_revm::database::StateProviderDatabase::new(&state_provider);
163
164        let mut blocks = Vec::with_capacity(block_count as usize);
165        for block_number in start_block..start_block + block_count {
166            let Some(block) = self
167                .provider()
168                .recovered_block(block_number.into(), TransactionVariant::WithHash)?
169            else {
170                if block_number == start_block {
171                    return Ok(None)
172                }
173                break;
174            };
175            blocks.push(block);
176        }
177
178        let outcome = self.evm_config().executor(db).execute_batch(&blocks).map_err(
179            |e: reth_evm::execute::BlockExecutionError| {
180                EthApiError::Internal(reth_errors::RethError::Other(e.into()))
181            },
182        )?;
183
184        Ok(Some(outcome))
185    }
186}
187
188#[async_trait]
189impl<Provider, EvmConfig> RethApiServer for RethApi<Provider, EvmConfig>
190where
191    Provider: BlockReaderIdExt
192        + ChangeSetReader
193        + StateProviderFactory
194        + BlockReader<Block = <Provider::Primitives as NodePrimitives>::Block>
195        + CanonStateSubscriptions
196        + ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
197        + PersistedBlockSubscriptions
198        + 'static,
199    EvmConfig: ConfigureEvm<Primitives = Provider::Primitives> + 'static,
200{
201    /// Handler for `reth_getBalanceChangesInBlock`
202    async fn reth_get_balance_changes_in_block(
203        &self,
204        block_id: BlockId,
205    ) -> RpcResult<AddressMap<U256>> {
206        Ok(Self::balance_changes_in_block(self, block_id).await?)
207    }
208
209    /// Handler for `reth_getBlockExecutionOutcome`
210    async fn reth_get_block_execution_outcome(
211        &self,
212        block_id: BlockId,
213        count: Option<U64>,
214    ) -> RpcResult<Option<serde_json::Value>> {
215        let outcome = Self::block_execution_outcome(self, block_id, count).await?;
216        match outcome {
217            Some(outcome) => {
218                let value = serde_json::to_value(&outcome).map_err(|e| {
219                    EthApiError::Internal(reth_errors::RethError::msg(e.to_string()))
220                })?;
221                Ok(Some(value))
222            }
223            None => Ok(None),
224        }
225    }
226
227    /// Handler for `reth_subscribeChainNotifications`
228    async fn reth_subscribe_chain_notifications(
229        &self,
230        pending: PendingSubscriptionSink,
231    ) -> jsonrpsee::core::SubscriptionResult {
232        let sink = pending.accept().await?;
233        let stream = self.provider().canonical_state_stream();
234        self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream));
235
236        Ok(())
237    }
238
239    /// Handler for `reth_subscribePersistedBlock`
240    async fn reth_subscribe_persisted_block(
241        &self,
242        pending: PendingSubscriptionSink,
243    ) -> jsonrpsee::core::SubscriptionResult {
244        let sink = pending.accept().await?;
245        let stream = self.provider().persisted_block_stream();
246        self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream));
247
248        Ok(())
249    }
250
251    /// Handler for `reth_subscribeFinalizedChainNotifications`
252    async fn reth_subscribe_finalized_chain_notifications(
253        &self,
254        pending: PendingSubscriptionSink,
255    ) -> jsonrpsee::core::SubscriptionResult {
256        let sink = pending.accept().await?;
257        let canon_stream = self.provider().canonical_state_stream();
258        let finalized_stream = self.provider().finalized_block_stream();
259        self.inner.task_spawner.spawn_task(finalized_chain_notifications(
260            sink,
261            canon_stream,
262            finalized_stream,
263        ));
264
265        Ok(())
266    }
267}
268
269/// Pipes all stream items to the subscription sink.
270async fn pipe_from_stream<S, T>(sink: SubscriptionSink, mut stream: S)
271where
272    S: Stream<Item = T> + Unpin,
273    T: Serialize,
274{
275    loop {
276        tokio::select! {
277            _ = sink.closed() => {
278                break
279            }
280            maybe_item = stream.next() => {
281                let Some(item) = maybe_item else {
282                    break
283                };
284                let msg = match SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) {
285                    Ok(msg) => msg,
286                    Err(err) => {
287                        tracing::error!(target: "rpc::reth", %err, "Failed to serialize subscription message");
288                        break
289                    }
290                };
291                if sink.send(msg).await.is_err() {
292                    break;
293                }
294            }
295        }
296    }
297}
298
299/// Buffers committed chain notifications and emits them when a new finalized block is received.
300async fn finalized_chain_notifications<N>(
301    sink: SubscriptionSink,
302    mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
303    mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
304) where
305    N: NodePrimitives,
306{
307    let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();
308
309    loop {
310        tokio::select! {
311            _ = sink.closed() => {
312                break
313            }
314            maybe_canon = canon_stream.next() => {
315                let Some(notification) = maybe_canon else { break };
316                match &notification {
317                    CanonStateNotification::Commit { .. } => {
318                        buffered.push(notification);
319                    }
320                    CanonStateNotification::Reorg { .. } => {
321                        buffered.clear();
322                    }
323                }
324            }
325            maybe_finalized = finalized_stream.next() => {
326                let Some(finalized_header) = maybe_finalized else { break };
327                let finalized_num = finalized_header.number();
328
329                let mut committed = Vec::new();
330                buffered.retain(|n| {
331                    if *n.committed().range().end() <= finalized_num {
332                        committed.push(n.clone());
333                        false
334                    } else {
335                        true
336                    }
337                });
338
339                if committed.is_empty() {
340                    continue;
341                }
342
343                committed.sort_by_key(|n| *n.committed().range().start());
344
345                let msg = match SubscriptionMessage::new(
346                    sink.method_name(),
347                    sink.subscription_id(),
348                    &committed,
349                ) {
350                    Ok(msg) => msg,
351                    Err(err) => {
352                        tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
353                        break
354                    }
355                };
356                if sink.send(msg).await.is_err() {
357                    break;
358                }
359            }
360        }
361    }
362}
363
364impl<Provider, EvmConfig> std::fmt::Debug for RethApi<Provider, EvmConfig> {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        f.debug_struct("RethApi").finish_non_exhaustive()
367    }
368}
369
370impl<Provider, EvmConfig> Clone for RethApi<Provider, EvmConfig> {
371    fn clone(&self) -> Self {
372        Self { inner: Arc::clone(&self.inner) }
373    }
374}
375
376struct RethApiInner<Provider, EvmConfig> {
377    /// The provider that can interact with the chain.
378    provider: Provider,
379    /// The EVM configuration used to create block executors.
380    evm_config: EvmConfig,
381    /// Guard to restrict the number of concurrent block re-execution requests.
382    blocking_task_guard: BlockingTaskGuard,
383    /// The type that can spawn tasks which would otherwise block.
384    task_spawner: Runtime,
385}