reth_ress_provider/
lib.rs

1//! Reth implementation of [`reth_ress_protocol::RessProtocolProvider`].
2
3#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4
5use alloy_consensus::BlockHeader as _;
6use alloy_primitives::{Bytes, B256};
7use parking_lot::Mutex;
8use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, MemoryOverlayStateProvider};
9use reth_ethereum_primitives::{Block, BlockBody, EthPrimitives};
10use reth_evm::execute::{BlockExecutorProvider, Executor};
11use reth_primitives_traits::{Block as _, Header, RecoveredBlock};
12use reth_provider::{
13    BlockReader, BlockSource, ProviderError, ProviderResult, StateProvider, StateProviderFactory,
14};
15use reth_ress_protocol::RessProtocolProvider;
16use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord};
17use reth_tasks::TaskSpawner;
18use reth_trie::{MultiProofTargets, Nibbles, TrieInput};
19use schnellru::{ByLength, LruMap};
20use std::{sync::Arc, time::Instant};
21use tokio::sync::{oneshot, Semaphore};
22use tracing::*;
23
24mod recorder;
25use recorder::StateWitnessRecorderDatabase;
26
27mod pending_state;
28pub use pending_state::*;
29
30/// Reth provider implementing [`RessProtocolProvider`].
31#[expect(missing_debug_implementations)]
32pub struct RethRessProtocolProvider<P, E> {
33    provider: P,
34    block_executor: E,
35    task_spawner: Box<dyn TaskSpawner>,
36    max_witness_window: u64,
37    witness_semaphore: Arc<Semaphore>,
38    witness_cache: Arc<Mutex<LruMap<B256, Arc<Vec<Bytes>>>>>,
39    pending_state: PendingState<EthPrimitives>,
40}
41
42impl<P: Clone, E: Clone> Clone for RethRessProtocolProvider<P, E> {
43    fn clone(&self) -> Self {
44        Self {
45            provider: self.provider.clone(),
46            block_executor: self.block_executor.clone(),
47            task_spawner: self.task_spawner.clone(),
48            max_witness_window: self.max_witness_window,
49            witness_semaphore: self.witness_semaphore.clone(),
50            witness_cache: self.witness_cache.clone(),
51            pending_state: self.pending_state.clone(),
52        }
53    }
54}
55
56impl<P, E> RethRessProtocolProvider<P, E>
57where
58    P: BlockReader<Block = Block> + StateProviderFactory,
59    E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
60{
61    /// Create new ress protocol provider.
62    pub fn new(
63        provider: P,
64        block_executor: E,
65        task_spawner: Box<dyn TaskSpawner>,
66        max_witness_window: u64,
67        witness_max_parallel: usize,
68        cache_size: u32,
69        pending_state: PendingState<EthPrimitives>,
70    ) -> eyre::Result<Self> {
71        Ok(Self {
72            provider,
73            block_executor,
74            task_spawner,
75            max_witness_window,
76            witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)),
77            witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
78            pending_state,
79        })
80    }
81
82    /// Retrieve a valid or invalid block by block hash.
83    pub fn block_by_hash(
84        &self,
85        block_hash: B256,
86    ) -> ProviderResult<Option<Arc<RecoveredBlock<Block>>>> {
87        // NOTE: we keep track of the pending state locally because reth does not provider a way
88        // to access non-canonical or invalid blocks via the provider.
89        let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) {
90            Some(block)
91        } else if let Some(block) =
92            self.provider.find_block_by_hash(block_hash, BlockSource::Any)?
93        {
94            let signers = block.recover_signers()?;
95            Some(Arc::new(block.into_recovered_with_signers(signers)))
96        } else {
97            // we attempt to look up invalid block last
98            self.pending_state.invalid_recovered_block(&block_hash)
99        };
100        Ok(maybe_block)
101    }
102
103    /// Generate state witness
104    pub fn generate_witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
105        if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() {
106            return Ok(witness.as_ref().clone())
107        }
108
109        let block =
110            self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
111
112        let best_block_number = self.provider.best_block_number()?;
113        if best_block_number.saturating_sub(block.number()) > self.max_witness_window {
114            return Err(ProviderError::TrieWitnessError(
115                "witness target block exceeds maximum witness window".to_owned(),
116            ))
117        }
118
119        let mut executed_ancestors = Vec::new();
120        let mut ancestor_hash = block.parent_hash();
121        let historical = 'sp: loop {
122            match self.provider.state_by_block_hash(ancestor_hash) {
123                Ok(state_provider) => break 'sp state_provider,
124                Err(_) => {
125                    // Attempt to retrieve a valid executed block first.
126                    let mut executed = self.pending_state.executed_block(&ancestor_hash);
127
128                    // If it's not present, attempt to lookup invalid block.
129                    if executed.is_none() {
130                        if let Some(invalid) =
131                            self.pending_state.invalid_recovered_block(&ancestor_hash)
132                        {
133                            trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
134                            executed = Some(ExecutedBlockWithTrieUpdates {
135                                block: ExecutedBlock {
136                                    recovered_block: invalid,
137                                    ..Default::default()
138                                },
139                                ..Default::default()
140                            });
141                        }
142                    }
143
144                    let Some(executed) = executed else {
145                        return Err(ProviderError::StateForHashNotFound(ancestor_hash))
146                    };
147                    ancestor_hash = executed.sealed_block().parent_hash();
148                    executed_ancestors.push(executed);
149                }
150            };
151        };
152
153        // Execute all gathered blocks to gather accesses state.
154        let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new(
155            MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()),
156        ));
157        let mut record = ExecutionWitnessRecord::default();
158
159        // We allow block execution to fail, since we still want to record all accessed state by
160        // invalid blocks.
161        if let Err(error) = self.block_executor.executor(&mut db).execute_with_state_closure(
162            &block,
163            |state: &State<_>| {
164                record.record_executed_state(state);
165            },
166        ) {
167            debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block");
168        }
169
170        // NOTE: there might be a race condition where target ancestor hash gets evicted from the
171        // database.
172        let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?;
173        let mut trie_input = TrieInput::default();
174        for block in executed_ancestors.into_iter().rev() {
175            trie_input.append_cached_ref(&block.trie, &block.hashed_state);
176        }
177        let mut hashed_state = db.into_state();
178        hashed_state.extend(record.hashed_state);
179
180        // Gather the state witness.
181        let witness = if hashed_state.is_empty() {
182            // If no state was accessed, at least the root node must be present.
183            let multiproof = witness_state_provider.multiproof(
184                trie_input,
185                MultiProofTargets::from_iter([(B256::ZERO, Default::default())]),
186            )?;
187            let mut witness = Vec::new();
188            if let Some(root_node) =
189                multiproof.account_subtree.into_inner().remove(&Nibbles::default())
190            {
191                witness.push(root_node);
192            }
193            witness
194        } else {
195            witness_state_provider.witness(trie_input, hashed_state)?
196        };
197
198        // Insert witness into the cache.
199        self.witness_cache.lock().insert(block_hash, Arc::new(witness.clone()));
200
201        Ok(witness)
202    }
203}
204
205impl<P, E> RessProtocolProvider for RethRessProtocolProvider<P, E>
206where
207    P: BlockReader<Block = Block> + StateProviderFactory + Clone + 'static,
208    E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
209{
210    fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
211        trace!(target: "reth::ress_provider", %block_hash, "Serving header");
212        Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone()))
213    }
214
215    fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
216        trace!(target: "reth::ress_provider", %block_hash, "Serving block body");
217        Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone()))
218    }
219
220    fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
221        trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode");
222        let maybe_bytecode = 'bytecode: {
223            if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) {
224                break 'bytecode Some(bytecode);
225            }
226
227            self.provider.latest()?.bytecode_by_hash(&code_hash)?
228        };
229
230        Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes()))
231    }
232
233    async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
234        trace!(target: "reth::ress_provider", %block_hash, "Serving witness");
235        let started_at = Instant::now();
236        let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?;
237        let this = self.clone();
238        let (tx, rx) = oneshot::channel();
239        self.task_spawner.spawn_blocking(Box::pin(async move {
240            let result = this.generate_witness(block_hash);
241            let _ = tx.send(result);
242        }));
243        match rx.await {
244            Ok(Ok(witness)) => {
245                trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness");
246                Ok(witness)
247            }
248            Ok(Err(error)) => Err(error),
249            Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())),
250        }
251    }
252}