Skip to main content

reth_rpc/
reth.rs

1use std::{future::Future, sync::Arc};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockId;
5use alloy_primitives::{map::AddressMap, U256, U64};
6use async_trait::async_trait;
7use futures::{Stream, StreamExt};
8use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
9use reth_chain_state::{
10    CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
11    PersistedBlockSubscriptions,
12};
13use reth_errors::{RethError, RethResult};
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_execution_types::ExecutionOutcome;
16use reth_primitives_traits::{NodePrimitives, SealedHeader};
17use reth_rpc_api::{RethApiServer, RethJitAction};
18use reth_rpc_eth_types::{EthApiError, EthResult};
19use reth_storage_api::{
20    BlockReader, BlockReaderIdExt, ChangeSetReader, StateProviderFactory, TransactionVariant,
21};
22use reth_tasks::{pool::BlockingTaskGuard, Runtime};
23use serde::Serialize;
24use tokio::sync::oneshot;
25
26/// `reth` API implementation.
27///
28/// This type provides the functionality for handling `reth` prototype RPC requests.
29pub struct RethApi<Provider, EvmConfig> {
30    inner: Arc<RethApiInner<Provider, EvmConfig>>,
31}
32
33// === impl RethApi ===
34
35impl<Provider, EvmConfig> RethApi<Provider, EvmConfig> {
36    /// The provider that can interact with the chain.
37    pub fn provider(&self) -> &Provider {
38        &self.inner.provider
39    }
40
41    /// The evm config.
42    pub fn evm_config(&self) -> &EvmConfig {
43        &self.inner.evm_config
44    }
45
46    /// Create a new instance of the [`RethApi`]
47    pub fn new(
48        provider: Provider,
49        evm_config: EvmConfig,
50        blocking_task_guard: BlockingTaskGuard,
51        task_spawner: Runtime,
52    ) -> Self {
53        let inner =
54            Arc::new(RethApiInner { provider, evm_config, blocking_task_guard, task_spawner });
55        Self { inner }
56    }
57}
58
59impl<Provider, EvmConfig> RethApi<Provider, EvmConfig>
60where
61    Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
62    EvmConfig: Send + Sync + 'static,
63{
64    /// Executes the future on a new blocking task.
65    async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
66    where
67        C: FnOnce(Self) -> F,
68        F: Future<Output = EthResult<R>> + Send + 'static,
69        R: Send + 'static,
70    {
71        let (tx, rx) = oneshot::channel();
72        let this = self.clone();
73        let f = c(this);
74        self.inner.task_spawner.spawn_blocking_task(async move {
75            let res = f.await;
76            let _ = tx.send(res);
77        });
78        rx.await.map_err(|_| EthApiError::InternalEthError)?
79    }
80
81    /// Returns a map of addresses to changed account balanced for a particular block.
82    pub async fn balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
83        self.on_blocking_task(async move |this| this.try_balance_changes_in_block(block_id)).await
84    }
85
86    fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<AddressMap<U256>> {
87        let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
88            return Err(EthApiError::HeaderNotFound(block_id))
89        };
90
91        let state = self.provider().state_by_block_id(block_id)?;
92        let accounts_before = self.provider().account_block_changeset(block_number)?;
93        let hash_map = accounts_before.iter().try_fold(
94            AddressMap::default(),
95            |mut hash_map, account_before| -> RethResult<_> {
96                let current_balance = state.account_balance(&account_before.address)?;
97                let prev_balance = account_before.info.map(|info| info.balance);
98                if current_balance != prev_balance {
99                    hash_map.insert(account_before.address, current_balance.unwrap_or_default());
100                }
101                Ok(hash_map)
102            },
103        )?;
104        Ok(hash_map)
105    }
106}
107
108impl<N, Provider, EvmConfig> RethApi<Provider, EvmConfig>
109where
110    N: NodePrimitives,
111    Provider: BlockReaderIdExt
112        + ChangeSetReader
113        + StateProviderFactory
114        + BlockReader<Block = N::Block>
115        + CanonStateSubscriptions<Primitives = N>
116        + 'static,
117    EvmConfig: ConfigureEvm<Primitives = N> + 'static,
118{
119    /// Re-executes one or more consecutive blocks and returns the execution outcome.
120    pub async fn block_execution_outcome(
121        &self,
122        block_id: BlockId,
123        count: Option<U64>,
124    ) -> EthResult<Option<ExecutionOutcome<N::Receipt>>> {
125        const MAX_BLOCK_COUNT: u64 = 128;
126
127        let block_count = count.map(|c| c.to::<u64>()).unwrap_or(1);
128        if block_count == 0 || block_count > MAX_BLOCK_COUNT {
129            return Err(EthApiError::InvalidParams(format!(
130                "block count must be between 1 and {MAX_BLOCK_COUNT}, got {block_count}"
131            )))
132        }
133
134        let permit = self
135            .inner
136            .blocking_task_guard
137            .clone()
138            .acquire_owned()
139            .await
140            .map_err(|_| EthApiError::InternalEthError)?;
141        self.on_blocking_task(async move |this| {
142            let _permit = permit;
143            this.try_block_execution_outcome(block_id, block_count)
144        })
145        .await
146    }
147
148    fn try_block_execution_outcome(
149        &self,
150        block_id: BlockId,
151        block_count: u64,
152    ) -> EthResult<Option<ExecutionOutcome<N::Receipt>>> {
153        let Some(start_block) = self.provider().block_number_for_id(block_id)? else {
154            return Ok(None)
155        };
156
157        if start_block == 0 {
158            return Ok(Some(ExecutionOutcome::default()))
159        }
160
161        let state_provider = self.provider().history_by_block_number(start_block - 1)?;
162        let db = reth_revm::database::StateProviderDatabase::new(&state_provider);
163
164        let mut blocks = Vec::with_capacity(block_count as usize);
165        for block_number in start_block..start_block + block_count {
166            let Some(block) = self
167                .provider()
168                .recovered_block(block_number.into(), TransactionVariant::WithHash)?
169            else {
170                if block_number == start_block {
171                    return Ok(None)
172                }
173                break;
174            };
175            blocks.push(block);
176        }
177
178        let outcome = self.evm_config().executor(db).execute_batch(&blocks).map_err(
179            |e: reth_evm::execute::BlockExecutionError| {
180                EthApiError::Internal(reth_errors::RethError::Other(e.into()))
181            },
182        )?;
183
184        Ok(Some(outcome))
185    }
186}
187
188#[async_trait]
189impl<Provider, EvmConfig> RethApiServer for RethApi<Provider, EvmConfig>
190where
191    Provider: BlockReaderIdExt
192        + ChangeSetReader
193        + StateProviderFactory
194        + BlockReader<Block = <Provider::Primitives as NodePrimitives>::Block>
195        + CanonStateSubscriptions
196        + ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
197        + PersistedBlockSubscriptions
198        + 'static,
199    EvmConfig: ConfigureEvm<Primitives = Provider::Primitives> + 'static,
200{
201    /// Handler for `reth_getBalanceChangesInBlock`
202    async fn reth_get_balance_changes_in_block(
203        &self,
204        block_id: BlockId,
205    ) -> RpcResult<AddressMap<U256>> {
206        Ok(Self::balance_changes_in_block(self, block_id).await?)
207    }
208
209    /// Handler for `reth_getBlockExecutionOutcome`
210    async fn reth_get_block_execution_outcome(
211        &self,
212        block_id: BlockId,
213        count: Option<U64>,
214    ) -> RpcResult<Option<serde_json::Value>> {
215        let outcome = Self::block_execution_outcome(self, block_id, count).await?;
216        match outcome {
217            Some(outcome) => {
218                let value = serde_json::to_value(&outcome).map_err(|e| {
219                    EthApiError::Internal(reth_errors::RethError::msg(e.to_string()))
220                })?;
221                Ok(Some(value))
222            }
223            None => Ok(None),
224        }
225    }
226
227    /// Handler for `reth_jit`
228    async fn reth_jit(&self, action: RethJitAction) -> RpcResult<()> {
229        let Some(jit_backend) = self.evm_config().jit_backend() else {
230            return Ok(());
231        };
232
233        match action {
234            RethJitAction::Enable => jit_backend
235                .set_enabled(true)
236                .map_err(|err| EthApiError::Internal(RethError::msg(err)))?,
237            RethJitAction::Disable => jit_backend
238                .set_enabled(false)
239                .map_err(|err| EthApiError::Internal(RethError::msg(err)))?,
240            RethJitAction::Pause => jit_backend.pause(),
241            RethJitAction::Unpause => jit_backend.resume(),
242            RethJitAction::Clear => jit_backend.clear(),
243        }
244
245        Ok(())
246    }
247
248    /// Handler for `reth_subscribeChainNotifications`
249    async fn reth_subscribe_chain_notifications(
250        &self,
251        pending: PendingSubscriptionSink,
252    ) -> jsonrpsee::core::SubscriptionResult {
253        let sink = pending.accept().await?;
254        let stream = self.provider().canonical_state_stream();
255        self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream));
256
257        Ok(())
258    }
259
260    /// Handler for `reth_subscribePersistedBlock`
261    async fn reth_subscribe_persisted_block(
262        &self,
263        pending: PendingSubscriptionSink,
264    ) -> jsonrpsee::core::SubscriptionResult {
265        let sink = pending.accept().await?;
266        let stream = self.provider().persisted_block_stream();
267        self.inner.task_spawner.spawn_task(pipe_from_stream(sink, stream));
268
269        Ok(())
270    }
271
272    /// Handler for `reth_subscribeFinalizedChainNotifications`
273    async fn reth_subscribe_finalized_chain_notifications(
274        &self,
275        pending: PendingSubscriptionSink,
276    ) -> jsonrpsee::core::SubscriptionResult {
277        let sink = pending.accept().await?;
278        let canon_stream = self.provider().canonical_state_stream();
279        let finalized_stream = self.provider().finalized_block_stream();
280        self.inner.task_spawner.spawn_task(finalized_chain_notifications(
281            sink,
282            canon_stream,
283            finalized_stream,
284        ));
285
286        Ok(())
287    }
288}
289
290/// Pipes all stream items to the subscription sink.
291async fn pipe_from_stream<S, T>(sink: SubscriptionSink, mut stream: S)
292where
293    S: Stream<Item = T> + Unpin,
294    T: Serialize,
295{
296    loop {
297        tokio::select! {
298            _ = sink.closed() => {
299                break
300            }
301            maybe_item = stream.next() => {
302                let Some(item) = maybe_item else {
303                    break
304                };
305                let msg = match SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) {
306                    Ok(msg) => msg,
307                    Err(err) => {
308                        tracing::error!(target: "rpc::reth", %err, "Failed to serialize subscription message");
309                        break
310                    }
311                };
312                if sink.send(msg).await.is_err() {
313                    break;
314                }
315            }
316        }
317    }
318}
319
320/// Buffers committed chain notifications and emits them when a new finalized block is received.
321async fn finalized_chain_notifications<N>(
322    sink: SubscriptionSink,
323    mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
324    mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
325) where
326    N: NodePrimitives,
327{
328    let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();
329
330    loop {
331        tokio::select! {
332            _ = sink.closed() => {
333                break
334            }
335            maybe_canon = canon_stream.next() => {
336                let Some(notification) = maybe_canon else { break };
337                match &notification {
338                    CanonStateNotification::Commit { .. } => {
339                        buffered.push(notification);
340                    }
341                    CanonStateNotification::Reorg { .. } => {
342                        buffered.clear();
343                    }
344                }
345            }
346            maybe_finalized = finalized_stream.next() => {
347                let Some(finalized_header) = maybe_finalized else { break };
348                let finalized_num = finalized_header.number();
349
350                let mut committed = Vec::new();
351                buffered.retain(|n| {
352                    if *n.committed().range().end() <= finalized_num {
353                        committed.push(n.clone());
354                        false
355                    } else {
356                        true
357                    }
358                });
359
360                if committed.is_empty() {
361                    continue;
362                }
363
364                committed.sort_by_key(|n| *n.committed().range().start());
365
366                let msg = match SubscriptionMessage::new(
367                    sink.method_name(),
368                    sink.subscription_id(),
369                    &committed,
370                ) {
371                    Ok(msg) => msg,
372                    Err(err) => {
373                        tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
374                        break
375                    }
376                };
377                if sink.send(msg).await.is_err() {
378                    break;
379                }
380            }
381        }
382    }
383}
384
385impl<Provider, EvmConfig> std::fmt::Debug for RethApi<Provider, EvmConfig> {
386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387        f.debug_struct("RethApi").finish_non_exhaustive()
388    }
389}
390
391impl<Provider, EvmConfig> Clone for RethApi<Provider, EvmConfig> {
392    fn clone(&self) -> Self {
393        Self { inner: Arc::clone(&self.inner) }
394    }
395}
396
397struct RethApiInner<Provider, EvmConfig> {
398    /// The provider that can interact with the chain.
399    provider: Provider,
400    /// The EVM configuration used to create block executors.
401    evm_config: EvmConfig,
402    /// Guard to restrict the number of concurrent block re-execution requests.
403    blocking_task_guard: BlockingTaskGuard,
404    /// The type that can spawn tasks which would otherwise block.
405    task_spawner: Runtime,
406}