1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use alloy_consensus::BlockHeader as _;
12use alloy_primitives::{Bytes, B256};
13use parking_lot::Mutex;
14use reth_chain_state::{
15 ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates, MemoryOverlayStateProvider,
16};
17use reth_errors::{ProviderError, ProviderResult};
18use reth_ethereum_primitives::{Block, BlockBody, EthPrimitives};
19use reth_evm::{execute::Executor, ConfigureEvm};
20use reth_primitives_traits::{Block as _, Header, RecoveredBlock};
21use reth_ress_protocol::RessProtocolProvider;
22use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord};
23use reth_tasks::TaskSpawner;
24use reth_trie::{MultiProofTargets, Nibbles, TrieInput};
25use schnellru::{ByLength, LruMap};
26use std::{sync::Arc, time::Instant};
27use tokio::sync::{oneshot, Semaphore};
28use tracing::*;
29
30mod recorder;
31use recorder::StateWitnessRecorderDatabase;
32
33mod pending_state;
34pub use pending_state::*;
35use reth_storage_api::{BlockReader, BlockSource, StateProviderFactory};
36
37#[expect(missing_debug_implementations)]
39#[derive(Clone)]
40pub struct RethRessProtocolProvider<P, E> {
41 provider: P,
42 evm_config: E,
43 task_spawner: Box<dyn TaskSpawner>,
44 max_witness_window: u64,
45 witness_semaphore: Arc<Semaphore>,
46 witness_cache: Arc<Mutex<LruMap<B256, Arc<Vec<Bytes>>>>>,
47 pending_state: PendingState<EthPrimitives>,
48}
49
50impl<P, E> RethRessProtocolProvider<P, E>
51where
52 P: BlockReader<Block = Block> + StateProviderFactory,
53 E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
54{
55 pub fn new(
57 provider: P,
58 evm_config: E,
59 task_spawner: Box<dyn TaskSpawner>,
60 max_witness_window: u64,
61 witness_max_parallel: usize,
62 cache_size: u32,
63 pending_state: PendingState<EthPrimitives>,
64 ) -> eyre::Result<Self> {
65 Ok(Self {
66 provider,
67 evm_config,
68 task_spawner,
69 max_witness_window,
70 witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)),
71 witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
72 pending_state,
73 })
74 }
75
76 pub fn block_by_hash(
78 &self,
79 block_hash: B256,
80 ) -> ProviderResult<Option<Arc<RecoveredBlock<Block>>>> {
81 let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) {
84 Some(block)
85 } else if let Some(block) =
86 self.provider.find_block_by_hash(block_hash, BlockSource::Any)?
87 {
88 let signers = block.recover_signers()?;
89 Some(Arc::new(block.into_recovered_with_signers(signers)))
90 } else {
91 self.pending_state.invalid_recovered_block(&block_hash)
93 };
94 Ok(maybe_block)
95 }
96
97 pub fn generate_witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
99 if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() {
100 return Ok(witness.as_ref().clone())
101 }
102
103 let block =
104 self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
105
106 let best_block_number = self.provider.best_block_number()?;
107 if best_block_number.saturating_sub(block.number()) > self.max_witness_window {
108 return Err(ProviderError::TrieWitnessError(
109 "witness target block exceeds maximum witness window".to_owned(),
110 ))
111 }
112
113 let mut executed_ancestors = Vec::new();
114 let mut ancestor_hash = block.parent_hash();
115 let historical = 'sp: loop {
116 match self.provider.state_by_block_hash(ancestor_hash) {
117 Ok(state_provider) => break 'sp state_provider,
118 Err(_) => {
119 let mut executed = self.pending_state.executed_block(&ancestor_hash);
121
122 if executed.is_none() &&
124 let Some(invalid) =
125 self.pending_state.invalid_recovered_block(&ancestor_hash)
126 {
127 trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
128 executed = Some(ExecutedBlockWithTrieUpdates {
129 block: ExecutedBlock { recovered_block: invalid, ..Default::default() },
130 trie: ExecutedTrieUpdates::empty(),
131 });
132 }
133
134 let Some(executed) = executed else {
135 return Err(ProviderError::StateForHashNotFound(ancestor_hash))
136 };
137 ancestor_hash = executed.sealed_block().parent_hash();
138 executed_ancestors.push(executed);
139 }
140 };
141 };
142
143 let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new(
145 MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()),
146 ));
147 let mut record = ExecutionWitnessRecord::default();
148
149 if let Err(error) = self.evm_config.batch_executor(&mut db).execute_with_state_closure(
152 &block,
153 |state: &State<_>| {
154 record.record_executed_state(state);
155 },
156 ) {
157 debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block");
158 }
159
160 let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?;
163 let mut trie_input = TrieInput::default();
164 for block in executed_ancestors.into_iter().rev() {
165 if let Some(trie_updates) = block.trie.as_ref() {
166 trie_input.append_cached_ref(trie_updates, &block.hashed_state);
167 } else {
168 trace!(target: "reth::ress_provider", ancestor = ?block.recovered_block().num_hash(), "Missing trie updates for ancestor block");
169 return Err(ProviderError::TrieWitnessError(
170 "missing trie updates for ancestor".to_owned(),
171 ));
172 }
173 }
174 let mut hashed_state = db.into_state();
175 hashed_state.extend(record.hashed_state);
176
177 let witness = if hashed_state.is_empty() {
179 let multiproof = witness_state_provider.multiproof(
181 trie_input,
182 MultiProofTargets::from_iter([(B256::ZERO, Default::default())]),
183 )?;
184 let mut witness = Vec::new();
185 if let Some(root_node) =
186 multiproof.account_subtree.into_inner().remove(&Nibbles::default())
187 {
188 witness.push(root_node);
189 }
190 witness
191 } else {
192 witness_state_provider.witness(trie_input, hashed_state)?
193 };
194
195 let cached_witness = Arc::new(witness.clone());
197 self.witness_cache.lock().insert(block_hash, cached_witness);
198
199 Ok(witness)
200 }
201}
202
203impl<P, E> RessProtocolProvider for RethRessProtocolProvider<P, E>
204where
205 P: BlockReader<Block = Block> + StateProviderFactory + Clone + 'static,
206 E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
207{
208 fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
209 trace!(target: "reth::ress_provider", %block_hash, "Serving header");
210 Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone()))
211 }
212
213 fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
214 trace!(target: "reth::ress_provider", %block_hash, "Serving block body");
215 Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone()))
216 }
217
218 fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
219 trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode");
220 let maybe_bytecode = 'bytecode: {
221 if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) {
222 break 'bytecode Some(bytecode);
223 }
224
225 self.provider.latest()?.bytecode_by_hash(&code_hash)?
226 };
227
228 Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes()))
229 }
230
231 async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
232 trace!(target: "reth::ress_provider", %block_hash, "Serving witness");
233 let started_at = Instant::now();
234 let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?;
235 let this = self.clone();
236 let (tx, rx) = oneshot::channel();
237 self.task_spawner.spawn_blocking(Box::pin(async move {
238 let result = this.generate_witness(block_hash);
239 let _ = tx.send(result);
240 }));
241 match rx.await {
242 Ok(Ok(witness)) => {
243 trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness");
244 Ok(witness)
245 }
246 Ok(Err(error)) => Err(error),
247 Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())),
248 }
249 }
250}