1#![cfg_attr(not(test), warn(unused_crate_dependencies))]
4
5use alloy_consensus::BlockHeader as _;
6use alloy_primitives::{Bytes, B256};
7use parking_lot::Mutex;
8use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, MemoryOverlayStateProvider};
9use reth_ethereum_primitives::{Block, BlockBody, EthPrimitives};
10use reth_evm::execute::{BlockExecutorProvider, Executor};
11use reth_primitives_traits::{Block as _, Header, RecoveredBlock};
12use reth_provider::{
13 BlockReader, BlockSource, ProviderError, ProviderResult, StateProvider, StateProviderFactory,
14};
15use reth_ress_protocol::RessProtocolProvider;
16use reth_revm::{database::StateProviderDatabase, db::State, witness::ExecutionWitnessRecord};
17use reth_tasks::TaskSpawner;
18use reth_trie::{MultiProofTargets, Nibbles, TrieInput};
19use schnellru::{ByLength, LruMap};
20use std::{sync::Arc, time::Instant};
21use tokio::sync::{oneshot, Semaphore};
22use tracing::*;
23
24mod recorder;
25use recorder::StateWitnessRecorderDatabase;
26
27mod pending_state;
28pub use pending_state::*;
29
30#[expect(missing_debug_implementations)]
32pub struct RethRessProtocolProvider<P, E> {
33 provider: P,
34 block_executor: E,
35 task_spawner: Box<dyn TaskSpawner>,
36 max_witness_window: u64,
37 witness_semaphore: Arc<Semaphore>,
38 witness_cache: Arc<Mutex<LruMap<B256, Arc<Vec<Bytes>>>>>,
39 pending_state: PendingState<EthPrimitives>,
40}
41
42impl<P: Clone, E: Clone> Clone for RethRessProtocolProvider<P, E> {
43 fn clone(&self) -> Self {
44 Self {
45 provider: self.provider.clone(),
46 block_executor: self.block_executor.clone(),
47 task_spawner: self.task_spawner.clone(),
48 max_witness_window: self.max_witness_window,
49 witness_semaphore: self.witness_semaphore.clone(),
50 witness_cache: self.witness_cache.clone(),
51 pending_state: self.pending_state.clone(),
52 }
53 }
54}
55
56impl<P, E> RethRessProtocolProvider<P, E>
57where
58 P: BlockReader<Block = Block> + StateProviderFactory,
59 E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
60{
61 pub fn new(
63 provider: P,
64 block_executor: E,
65 task_spawner: Box<dyn TaskSpawner>,
66 max_witness_window: u64,
67 witness_max_parallel: usize,
68 cache_size: u32,
69 pending_state: PendingState<EthPrimitives>,
70 ) -> eyre::Result<Self> {
71 Ok(Self {
72 provider,
73 block_executor,
74 task_spawner,
75 max_witness_window,
76 witness_semaphore: Arc::new(Semaphore::new(witness_max_parallel)),
77 witness_cache: Arc::new(Mutex::new(LruMap::new(ByLength::new(cache_size)))),
78 pending_state,
79 })
80 }
81
82 pub fn block_by_hash(
84 &self,
85 block_hash: B256,
86 ) -> ProviderResult<Option<Arc<RecoveredBlock<Block>>>> {
87 let maybe_block = if let Some(block) = self.pending_state.recovered_block(&block_hash) {
90 Some(block)
91 } else if let Some(block) =
92 self.provider.find_block_by_hash(block_hash, BlockSource::Any)?
93 {
94 let signers = block.recover_signers()?;
95 Some(Arc::new(block.into_recovered_with_signers(signers)))
96 } else {
97 self.pending_state.invalid_recovered_block(&block_hash)
99 };
100 Ok(maybe_block)
101 }
102
103 pub fn generate_witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
105 if let Some(witness) = self.witness_cache.lock().get(&block_hash).cloned() {
106 return Ok(witness.as_ref().clone())
107 }
108
109 let block =
110 self.block_by_hash(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
111
112 let best_block_number = self.provider.best_block_number()?;
113 if best_block_number.saturating_sub(block.number()) > self.max_witness_window {
114 return Err(ProviderError::TrieWitnessError(
115 "witness target block exceeds maximum witness window".to_owned(),
116 ))
117 }
118
119 let mut executed_ancestors = Vec::new();
120 let mut ancestor_hash = block.parent_hash();
121 let historical = 'sp: loop {
122 match self.provider.state_by_block_hash(ancestor_hash) {
123 Ok(state_provider) => break 'sp state_provider,
124 Err(_) => {
125 let mut executed = self.pending_state.executed_block(&ancestor_hash);
127
128 if executed.is_none() {
130 if let Some(invalid) =
131 self.pending_state.invalid_recovered_block(&ancestor_hash)
132 {
133 trace!(target: "reth::ress_provider", %block_hash, %ancestor_hash, "Using invalid ancestor block for witness construction");
134 executed = Some(ExecutedBlockWithTrieUpdates {
135 block: ExecutedBlock {
136 recovered_block: invalid,
137 ..Default::default()
138 },
139 ..Default::default()
140 });
141 }
142 }
143
144 let Some(executed) = executed else {
145 return Err(ProviderError::StateForHashNotFound(ancestor_hash))
146 };
147 ancestor_hash = executed.sealed_block().parent_hash();
148 executed_ancestors.push(executed);
149 }
150 };
151 };
152
153 let mut db = StateWitnessRecorderDatabase::new(StateProviderDatabase::new(
155 MemoryOverlayStateProvider::new(historical, executed_ancestors.clone()),
156 ));
157 let mut record = ExecutionWitnessRecord::default();
158
159 if let Err(error) = self.block_executor.executor(&mut db).execute_with_state_closure(
162 &block,
163 |state: &State<_>| {
164 record.record_executed_state(state);
165 },
166 ) {
167 debug!(target: "reth::ress_provider", %block_hash, %error, "Error executing the block");
168 }
169
170 let witness_state_provider = self.provider.state_by_block_hash(ancestor_hash)?;
173 let mut trie_input = TrieInput::default();
174 for block in executed_ancestors.into_iter().rev() {
175 trie_input.append_cached_ref(&block.trie, &block.hashed_state);
176 }
177 let mut hashed_state = db.into_state();
178 hashed_state.extend(record.hashed_state);
179
180 let witness = if hashed_state.is_empty() {
182 let multiproof = witness_state_provider.multiproof(
184 trie_input,
185 MultiProofTargets::from_iter([(B256::ZERO, Default::default())]),
186 )?;
187 let mut witness = Vec::new();
188 if let Some(root_node) =
189 multiproof.account_subtree.into_inner().remove(&Nibbles::default())
190 {
191 witness.push(root_node);
192 }
193 witness
194 } else {
195 witness_state_provider.witness(trie_input, hashed_state)?
196 };
197
198 self.witness_cache.lock().insert(block_hash, Arc::new(witness.clone()));
200
201 Ok(witness)
202 }
203}
204
205impl<P, E> RessProtocolProvider for RethRessProtocolProvider<P, E>
206where
207 P: BlockReader<Block = Block> + StateProviderFactory + Clone + 'static,
208 E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
209{
210 fn header(&self, block_hash: B256) -> ProviderResult<Option<Header>> {
211 trace!(target: "reth::ress_provider", %block_hash, "Serving header");
212 Ok(self.block_by_hash(block_hash)?.map(|b| b.header().clone()))
213 }
214
215 fn block_body(&self, block_hash: B256) -> ProviderResult<Option<BlockBody>> {
216 trace!(target: "reth::ress_provider", %block_hash, "Serving block body");
217 Ok(self.block_by_hash(block_hash)?.map(|b| b.body().clone()))
218 }
219
220 fn bytecode(&self, code_hash: B256) -> ProviderResult<Option<Bytes>> {
221 trace!(target: "reth::ress_provider", %code_hash, "Serving bytecode");
222 let maybe_bytecode = 'bytecode: {
223 if let Some(bytecode) = self.pending_state.find_bytecode(code_hash) {
224 break 'bytecode Some(bytecode);
225 }
226
227 self.provider.latest()?.bytecode_by_hash(&code_hash)?
228 };
229
230 Ok(maybe_bytecode.map(|bytecode| bytecode.original_bytes()))
231 }
232
233 async fn witness(&self, block_hash: B256) -> ProviderResult<Vec<Bytes>> {
234 trace!(target: "reth::ress_provider", %block_hash, "Serving witness");
235 let started_at = Instant::now();
236 let _permit = self.witness_semaphore.acquire().await.map_err(ProviderError::other)?;
237 let this = self.clone();
238 let (tx, rx) = oneshot::channel();
239 self.task_spawner.spawn_blocking(Box::pin(async move {
240 let result = this.generate_witness(block_hash);
241 let _ = tx.send(result);
242 }));
243 match rx.await {
244 Ok(Ok(witness)) => {
245 trace!(target: "reth::ress_provider", %block_hash, elapsed = ?started_at.elapsed(), "Computed witness");
246 Ok(witness)
247 }
248 Ok(Err(error)) => Err(error),
249 Err(_) => Err(ProviderError::TrieWitnessError("dropped".to_owned())),
250 }
251 }
252}