1use crate::proof_task::{StorageProofResult, StorageProofResultMessage};
2use alloy_primitives::{map::B256Map, B256};
3use alloy_rlp::Encodable;
4use core::cell::RefCell;
5use crossbeam_channel::Receiver as CrossbeamReceiver;
6use reth_execution_errors::trie::StateProofError;
7use reth_primitives_traits::{dashmap::DashMap, Account};
8use reth_storage_errors::db::DatabaseError;
9use reth_trie::{
10 hashed_cursor::HashedStorageCursor,
11 proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
12 trie_cursor::TrieStorageCursor,
13 ProofTrieNode,
14};
15use std::{
16 rc::Rc,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21#[derive(Debug, Default, Clone, Copy)]
25pub(crate) struct ValueEncoderStats {
26 pub(crate) storage_wait_time: Duration,
28 pub(crate) dispatched_count: u64,
30 pub(crate) from_cache_count: u64,
32 pub(crate) sync_count: u64,
34 pub(crate) dispatched_missing_root_count: u64,
37}
38
39impl ValueEncoderStats {
40 pub(crate) fn extend(&mut self, other: &Self) {
42 self.storage_wait_time += other.storage_wait_time;
43 self.dispatched_count += other.dispatched_count;
44 self.from_cache_count += other.from_cache_count;
45 self.sync_count += other.sync_count;
46 self.dispatched_missing_root_count += other.dispatched_missing_root_count;
47 }
48}
49
50pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
52 Dispatched {
54 hashed_address: B256,
55 account: Account,
56 proof_result_rx:
60 Option<Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>>,
61 storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
63 stats: Rc<RefCell<ValueEncoderStats>>,
65 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
68 cached_storage_roots: Arc<DashMap<B256, B256>>,
70 },
71 FromCache { account: Account, root: B256 },
73 Sync {
75 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
77 hashed_address: B256,
78 account: Account,
79 cached_storage_roots: Arc<DashMap<B256, B256>>,
81 },
82}
83
84impl<TC, HC> Drop for AsyncAccountDeferredValueEncoder<TC, HC> {
85 fn drop(&mut self) {
86 let res = if let Self::Dispatched {
89 hashed_address,
90 proof_result_rx,
91 storage_proof_results,
92 stats,
93 ..
94 } = self
95 {
96 let Some(proof_result_rx) = proof_result_rx.take() else { return };
98
99 (|| -> Result<(), StateProofError> {
100 let rx = proof_result_rx?;
101
102 let wait_start = Instant::now();
103 let msg = rx.recv().map_err(|_| {
104 StateProofError::Database(DatabaseError::Other(format!(
105 "Storage proof channel closed for {hashed_address:?}",
106 )))
107 })?;
108 let result = msg.result?;
109
110 stats.borrow_mut().storage_wait_time += wait_start.elapsed();
111
112 let StorageProofResult::V2 { proof, .. } = result else {
113 panic!("StorageProofResult is not V2: {result:?}")
114 };
115
116 storage_proof_results.borrow_mut().insert(*hashed_address, proof);
117 Ok(())
118 })()
119 } else {
120 return;
121 };
122
123 if let Err(err) = res {
124 tracing::error!(target: "trie::parallel", %err, "Failed to collect storage proof in deferred encoder drop");
125 }
126 }
127}
128
129impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
130where
131 TC: TrieStorageCursor,
132 HC: HashedStorageCursor<Value = alloy_primitives::U256>,
133{
134 fn encode(mut self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
135 let (account, root) = match &mut self {
136 Self::Dispatched {
137 hashed_address,
138 account,
139 proof_result_rx,
140 storage_proof_results,
141 stats,
142 storage_calculator,
143 cached_storage_roots,
144 } => {
145 let hashed_address = *hashed_address;
146 let account = *account;
147 let proof_result_rx = proof_result_rx
149 .take()
150 .expect("encode called on already-consumed Dispatched encoder");
151 let wait_start = Instant::now();
152 let result = proof_result_rx?
153 .recv()
154 .map_err(|_| {
155 StateProofError::Database(DatabaseError::Other(format!(
156 "Storage proof channel closed for {hashed_address:?}",
157 )))
158 })?
159 .result?;
160 stats.borrow_mut().storage_wait_time += wait_start.elapsed();
161
162 let StorageProofResult::V2 { root, proof } = result else {
163 panic!("StorageProofResult is not V2: {result:?}")
164 };
165
166 storage_proof_results.borrow_mut().insert(hashed_address, proof);
167
168 let root = match root {
169 Some(root) => root,
170 None => {
171 stats.borrow_mut().dispatched_missing_root_count += 1;
178
179 let mut calculator = storage_calculator.borrow_mut();
180 let root_node = calculator.storage_root_node(hashed_address)?;
181 let storage_root = calculator
182 .compute_root_hash(&[root_node])?
183 .expect("storage_root_node returns a node at empty path");
184
185 cached_storage_roots.insert(hashed_address, storage_root);
186 storage_root
187 }
188 };
189
190 (account, root)
191 }
192 Self::FromCache { account, root } => (*account, *root),
193 Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
194 let hashed_address = *hashed_address;
195 let account = *account;
196 let mut calculator = storage_calculator.borrow_mut();
197 let root_node = calculator.storage_root_node(hashed_address)?;
198 let storage_root = calculator
199 .compute_root_hash(&[root_node])?
200 .expect("storage_root_node returns a node at empty path");
201
202 cached_storage_roots.insert(hashed_address, storage_root);
203 (account, storage_root)
204 }
205 };
206
207 let account = account.into_trie_account(root);
208 account.encode(buf);
209 Ok(())
210 }
211}
212
213pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
222 dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
224 cached_storage_roots: Arc<DashMap<B256, B256>>,
227 storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
230 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
233 stats: Rc<RefCell<ValueEncoderStats>>,
235}
236
237impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
238 pub(crate) fn new(
246 dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
247 cached_storage_roots: Arc<DashMap<B256, B256>>,
248 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
249 ) -> Self {
250 Self {
251 dispatched,
252 cached_storage_roots,
253 storage_proof_results: Default::default(),
254 storage_calculator,
255 stats: Default::default(),
256 }
257 }
258
259 pub(crate) fn finalize(
269 self,
270 ) -> Result<(B256Map<Vec<ProofTrieNode>>, ValueEncoderStats), StateProofError> {
271 let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
272 .expect("no deferred encoders are still allocated")
273 .into_inner();
274
275 let mut stats = Rc::into_inner(self.stats)
276 .expect("no deferred encoders are still allocated")
277 .into_inner();
278
279 for (hashed_address, rx) in &self.dispatched {
282 let wait_start = Instant::now();
283 let result = rx
284 .recv()
285 .map_err(|_| {
286 StateProofError::Database(DatabaseError::Other(format!(
287 "Storage proof channel closed for {hashed_address:?}",
288 )))
289 })?
290 .result?;
291 stats.storage_wait_time += wait_start.elapsed();
292
293 let StorageProofResult::V2 { proof, .. } = result else {
294 panic!("StorageProofResult is not V2: {result:?}")
295 };
296
297 storage_proof_results.insert(*hashed_address, proof);
298 }
299
300 Ok((storage_proof_results, stats))
301 }
302}
303
304impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
305where
306 TC: TrieStorageCursor,
307 HC: HashedStorageCursor<Value = alloy_primitives::U256>,
308{
309 type Value = Account;
310 type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
311
312 fn deferred_encoder(
313 &mut self,
314 hashed_address: B256,
315 account: Self::Value,
316 ) -> Self::DeferredEncoder {
317 if let Some(rx) = self.dispatched.remove(&hashed_address) {
320 self.stats.borrow_mut().dispatched_count += 1;
321 return AsyncAccountDeferredValueEncoder::Dispatched {
322 hashed_address,
323 account,
324 proof_result_rx: Some(Ok(rx)),
325 storage_proof_results: self.storage_proof_results.clone(),
326 stats: self.stats.clone(),
327 storage_calculator: self.storage_calculator.clone(),
328 cached_storage_roots: self.cached_storage_roots.clone(),
329 }
330 }
331
332 if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
337 self.stats.borrow_mut().from_cache_count += 1;
338 return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
339 }
340
341 self.stats.borrow_mut().sync_count += 1;
343 AsyncAccountDeferredValueEncoder::Sync {
344 storage_calculator: self.storage_calculator.clone(),
345 hashed_address,
346 account,
347 cached_storage_roots: self.cached_storage_roots.clone(),
348 }
349 }
350}