1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use alloy_consensus::BlockHeader as _;
12use alloy_primitives::{Bytes, B256};
13use parking_lot::Mutex;
14use reth_chain_state::{ExecutedBlock, MemoryOverlayStateProvider};
15use reth_errors::{ProviderError, ProviderResult};
16use reth_ethereum_primitives::{Block, BlockBody, EthPrimitives};
17use reth_evm::{execute::Executor, ConfigureEvm};
18use reth_primitives_traits::{Block as _, Header, RecoveredBlock};
19use reth_ress_protocol::RessProtocolProvider;
20use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord};
21use reth_tasks::TaskSpawner;
22use reth_trie::{MultiProofTargets, Nibbles, TrieInput};
23use schnellru::{ByLength, LruMap};
24use std::{sync::Arc, time::Instant};
25use tokio::sync::{oneshot, Semaphore};
26use tracing::*;
27
28mod recorder;
29use recorder::StateWitnessRecorderDatabase;
30
31mod pending_state;
32pub use pending_state::*;
33use reth_storage_api::{BlockReader, BlockSource, StateProviderFactory};
34
35#[expect(missing_debug_implementations)]
37#[derive(Clone)]
38pub struct RethRessProtocolProvider<P, E> {
39 provider: P,
40 evm_config: E,
41 task_spawner: Box<dyn TaskSpawner>,
42 max_witness_window: u64,
43 witness_semaphore: Arc<Semaphore>,
44 witness_cache: Arc<Mutex<LruMap<B256, Arc<Vec<Bytes>>>>>,
45 pending_state: PendingState<EthPrimitives>,
46}
47
48impl<P, E> RethRessProtocolProvider<P, E>
49where
50 P: BlockReader<Block = Block> + StateProviderFactory,
51 E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
52{
53 pub fn new(
55 provider: P,
56 evm_config: E,
57 task_spawner: Box<dyn TaskSpawner>,
58 max_witness_window: u64,
59 witness_max_parallel: usize,
60 cache_size: u32,
61 pending_state: PendingState<EthPrimitives>,
62 ) -> eyre::Result<Self> {
63 Ok(Self {
64 provider,
65 evm_config,
66 task_spawner,
67 max_witness_window,
68 witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)),
69 witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
70 pending_state,
71 })
72 }
73
74 pub fn block_by_hash(
76 &self,
77 block_hash: B256,
78 ) -> ProviderResult<Option<Arc<RecoveredBlock<Block>>>> {
79 let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) {
82 Some(block)
83 } else if let Some(block) =
84 self.provider.find_block_by_hash(block_hash, BlockSource::Any)?
85 {
86 let signers = block.recover_signers()?;
87 Some(Arc::new(block.into_recovered_with_signers(signers)))
88 } else {
89 self.pending_state.invalid_recovered_block(&block_hash)
91 };
92 Ok(maybe_block)
93 }
94
95 pub fn generate_witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
97 if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() {
98 return Ok(witness.as_ref().clone())
99 }
100
101 let block =
102 self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
103
104 let best_block_number = self.provider.best_block_number()?;
105 if best_block_number.saturating_sub(block.number()) > self.max_witness_window {
106 return Err(ProviderError::TrieWitnessError(
107 "witness target block exceeds maximum witness window".to_owned(),
108 ))
109 }
110
111 let mut executed_ancestors = Vec::new();
112 let mut ancestor_hash = block.parent_hash();
113 let historical = 'sp: loop {
114 match self.provider.state_by_block_hash(ancestor_hash) {
115 Ok(state_provider) => break 'sp state_provider,
116 Err(_) => {
117 let mut executed = self.pending_state.executed_block(&ancestor_hash);
119
120 if executed.is_none() &&
122 let Some(invalid) =
123 self.pending_state.invalid_recovered_block(&ancestor_hash)
124 {
125 trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
126 executed =
127 Some(ExecutedBlock { recovered_block: invalid, ..Default::default() });
128 }
129
130 let Some(executed) = executed else {
131 return Err(ProviderError::StateForHashNotFound(ancestor_hash))
132 };
133 ancestor_hash = executed.sealed_block().parent_hash();
134 executed_ancestors.push(executed);
135 }
136 };
137 };
138
139 let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new(
141 MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()),
142 ));
143 let mut record = ExecutionWitnessRecord::default();
144
145 if let Err(error) = self.evm_config.batch_executor(&mut db).execute_with_state_closure(
148 &block,
149 |state: &State<_>| {
150 record.record_executed_state(state);
151 },
152 ) {
153 debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block");
154 }
155
156 let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?;
159 let trie_input = TrieInput::from_blocks_sorted(
160 executed_ancestors
161 .iter()
162 .rev()
163 .map(|block| (block.hashed_state.as_ref(), block.trie_updates.as_ref())),
164 );
165 let mut hashed_state = db.into_state();
166 hashed_state.extend(record.hashed_state);
167
168 let witness = if hashed_state.is_empty() {
170 let multiproof = witness_state_provider.multiproof(
172 trie_input,
173 MultiProofTargets::from_iter([(B256::ZERO, Default::default())]),
174 )?;
175 let mut witness = Vec::new();
176 if let Some(root_node) =
177 multiproof.account_subtree.into_inner().remove(&Nibbles::default())
178 {
179 witness.push(root_node);
180 }
181 witness
182 } else {
183 witness_state_provider.witness(trie_input, hashed_state)?
184 };
185
186 let cached_witness = Arc::new(witness.clone());
188 self.witness_cache.lock().insert(block_hash, cached_witness);
189
190 Ok(witness)
191 }
192}
193
194impl<P, E> RessProtocolProvider for RethRessProtocolProvider<P, E>
195where
196 P: BlockReader<Block = Block> + StateProviderFactory + Clone + 'static,
197 E: ConfigureEvm<Primitives = EthPrimitives> + 'static,
198{
199 fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
200 trace!(target: "reth::ress_provider", %block_hash, "Serving header");
201 Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone()))
202 }
203
204 fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
205 trace!(target: "reth::ress_provider", %block_hash, "Serving block body");
206 Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone()))
207 }
208
209 fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
210 trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode");
211 let maybe_bytecode = 'bytecode: {
212 if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) {
213 break 'bytecode Some(bytecode);
214 }
215
216 self.provider.latest()?.bytecode_by_hash(&code_hash)?
217 };
218
219 Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes()))
220 }
221
222 async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
223 trace!(target: "reth::ress_provider", %block_hash, "Serving witness");
224 let started_at = Instant::now();
225 let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?;
226 let this = self.clone();
227 let (tx, rx) = oneshot::channel();
228 self.task_spawner.spawn_blocking(Box::pin(async move {
229 let result = this.generate_witness(block_hash);
230 let _ = tx.send(result);
231 }));
232 match rx.await {
233 Ok(Ok(witness)) => {
234 trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness");
235 Ok(witness)
236 }
237 Ok(Err(error)) => Err(error),
238 Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())),
239 }
240 }
241}