reth_ress_provider/
lib.rs

1//! Reth implementation of [`reth_ress_protocol::RessProtocolProvider`].
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use alloy_consensus::BlockHeader as _;
12use alloy_primitives::{Bytes, B256};
13use parking_lot::Mutex;
14use reth_chain_state::{
15    ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates, MemoryOverlayStateProvider,
16};
17use reth_errors::{ProviderError, ProviderResult};
18use reth_ethereum_primitives::{Block, BlockBody, EthPrimitives};
19use reth_evm::{execute::Executor, ConfigureEvm};
20use reth_primitives_traits::{Block as _, Header, RecoveredBlock};
21use reth_ress_protocol::RessProtocolProvider;
22use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord};
23use reth_tasks::TaskSpawner;
24use reth_trie::{MultiProofTargets, Nibbles, TrieInput};
25use schnellru::{ByLength, LruMap};
26use std::{sync::Arc, time::Instant};
27use tokio::sync::{oneshot, Semaphore};
28use tracing::*;
29
30mod recorder;
31use recorder::StateWitnessRecorderDatabase;
32
33mod pending_state;
34pub use pending_state::*;
35use reth_storage_api::{BlockReader, BlockSource, StateProviderFactory};
36
37/// Reth provider implementing [`RessProtocolProvider`].
38#[expect(missing_debug_implementations)]
39#[derive(Clone)]
40pub struct RethRessProtocolProvider<P, E> {
41    provider: P,
42    evm_config: E,
43    task_spawner: Box<dyn TaskSpawner>,
44    max_witness_window: u64,
45    witness_semaphore: Arc<Semaphore>,
46    witness_cache: Arc<Mutex<LruMap<B256, Arc<Vec<Bytes>>>>>,
47    pending_state: PendingState<EthPrimitives>,
48}
49
50impl<P, E> RethRessProtocolProvider<P, E>
51where
52    P: BlockReader<Block = Block> + StateProviderFactory,
53    E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
54{
55    /// Create new ress protocol provider.
56    pub fn new(
57        provider: P,
58        evm_config: E,
59        task_spawner: Box<dyn TaskSpawner>,
60        max_witness_window: u64,
61        witness_max_parallel: usize,
62        cache_size: u32,
63        pending_state: PendingState<EthPrimitives>,
64    ) -> eyre::Result<Self> {
65        Ok(Self {
66            provider,
67            evm_config,
68            task_spawner,
69            max_witness_window,
70            witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)),
71            witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
72            pending_state,
73        })
74    }
75
76    /// Retrieve a valid or invalid block by block hash.
77    pub fn block_by_hash(
78        &self,
79        block_hash: B256,
80    ) -> ProviderResult<Option<Arc<RecoveredBlock<Block>>>> {
81        // NOTE: we keep track of the pending state locally because reth does not provider a way
82        // to access non-canonical or invalid blocks via the provider.
83        let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) {
84            Some(block)
85        } else if let Some(block) =
86            self.provider.find_block_by_hash(block_hash, BlockSource::Any)?
87        {
88            let signers = block.recover_signers()?;
89            Some(Arc::new(block.into_recovered_with_signers(signers)))
90        } else {
91            // we attempt to look up invalid block last
92            self.pending_state.invalid_recovered_block(&block_hash)
93        };
94        Ok(maybe_block)
95    }
96
97    /// Generate state witness
98    pub fn generate_witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
99        if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() {
100            return Ok(witness.as_ref().clone())
101        }
102
103        let block =
104            self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
105
106        let best_block_number = self.provider.best_block_number()?;
107        if best_block_number.saturating_sub(block.number()) > self.max_witness_window {
108            return Err(ProviderError::TrieWitnessError(
109                "witness target block exceeds maximum witness window".to_owned(),
110            ))
111        }
112
113        let mut executed_ancestors = Vec::new();
114        let mut ancestor_hash = block.parent_hash();
115        let historical = 'sp: loop {
116            match self.provider.state_by_block_hash(ancestor_hash) {
117                Ok(state_provider) => break 'sp state_provider,
118                Err(_) => {
119                    // Attempt to retrieve a valid executed block first.
120                    let mut executed = self.pending_state.executed_block(&ancestor_hash);
121
122                    // If it's not present, attempt to lookup invalid block.
123                    if executed.is_none() {
124                        if let Some(invalid) =
125                            self.pending_state.invalid_recovered_block(&ancestor_hash)
126                        {
127                            trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
128                            executed = Some(ExecutedBlockWithTrieUpdates {
129                                block: ExecutedBlock {
130                                    recovered_block: invalid,
131                                    ..Default::default()
132                                },
133                                trie: ExecutedTrieUpdates::empty(),
134                            });
135                        }
136                    }
137
138                    let Some(executed) = executed else {
139                        return Err(ProviderError::StateForHashNotFound(ancestor_hash))
140                    };
141                    ancestor_hash = executed.sealed_block().parent_hash();
142                    executed_ancestors.push(executed);
143                }
144            };
145        };
146
147        // Execute all gathered blocks to gather accesses state.
148        let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new(
149            MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()),
150        ));
151        let mut record = ExecutionWitnessRecord::default();
152
153        // We allow block execution to fail, since we still want to record all accessed state by
154        // invalid blocks.
155        if let Err(error) = self.evm_config.batch_executor(&mut db).execute_with_state_closure(
156            &block,
157            |state: &State<_>| {
158                record.record_executed_state(state);
159            },
160        ) {
161            debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block");
162        }
163
164        // NOTE: there might be a race condition where target ancestor hash gets evicted from the
165        // database.
166        let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?;
167        let mut trie_input = TrieInput::default();
168        for block in executed_ancestors.into_iter().rev() {
169            trie_input.append_cached_ref(block.trie.as_ref().unwrap(), &block.hashed_state);
170        }
171        let mut hashed_state = db.into_state();
172        hashed_state.extend(record.hashed_state);
173
174        // Gather the state witness.
175        let witness = if hashed_state.is_empty() {
176            // If no state was accessed, at least the root node must be present.
177            let multiproof = witness_state_provider.multiproof(
178                trie_input,
179                MultiProofTargets::from_iter([(B256::ZERO, Default::default())]),
180            )?;
181            let mut witness = Vec::new();
182            if let Some(root_node) =
183                multiproof.account_subtree.into_inner().remove(&Nibbles::default())
184            {
185                witness.push(root_node);
186            }
187            witness
188        } else {
189            witness_state_provider.witness(trie_input, hashed_state)?
190        };
191
192        // Insert witness into the cache.
193        let cached_witness = Arc::new(witness.clone());
194        self.witness_cache.lock().insert(block_hash, cached_witness);
195
196        Ok(witness)
197    }
198}
199
200impl<P, E> RessProtocolProvider for RethRessProtocolProvider<P, E>
201where
202    P: BlockReader<Block = Block> + StateProviderFactory + Clone + 'static,
203    E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
204{
205    fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
206        trace!(target: "reth::ress_provider", %block_hash, "Serving header");
207        Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone()))
208    }
209
210    fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
211        trace!(target: "reth::ress_provider", %block_hash, "Serving block body");
212        Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone()))
213    }
214
215    fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
216        trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode");
217        let maybe_bytecode = 'bytecode: {
218            if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) {
219                break 'bytecode Some(bytecode);
220            }
221
222            self.provider.latest()?.bytecode_by_hash(&code_hash)?
223        };
224
225        Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes()))
226    }
227
228    async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
229        trace!(target: "reth::ress_provider", %block_hash, "Serving witness");
230        let started_at = Instant::now();
231        let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?;
232        let this = self.clone();
233        let (tx, rx) = oneshot::channel();
234        self.task_spawner.spawn_blocking(Box::pin(async move {
235            let result = this.generate_witness(block_hash);
236            let _ = tx.send(result);
237        }));
238        match rx.await {
239            Ok(Ok(witness)) => {
240                trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness");
241                Ok(witness)
242            }
243            Ok(Err(error)) => Err(error),
244            Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())),
245        }
246    }
247}