1use crate::proof_task::StorageProofResultMessage;
2use alloy_primitives::{map::B256Map, B256};
3use alloy_rlp::Encodable;
4use core::cell::RefCell;
5use crossbeam_channel::Receiver as CrossbeamReceiver;
6use reth_execution_errors::trie::StateProofError;
7use reth_primitives_traits::{dashmap::DashMap, Account};
8use reth_storage_errors::db::DatabaseError;
9use reth_trie::{
10 hashed_cursor::HashedStorageCursor,
11 proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
12 trie_cursor::TrieStorageCursor,
13 ProofTrieNodeV2,
14};
15use std::{
16 rc::Rc,
17 sync::Arc,
18 time::{Duration, Instant},
19};
20
21#[derive(Debug, Default, Clone, Copy)]
25pub(crate) struct ValueEncoderStats {
26 pub(crate) storage_wait_time: Duration,
28 pub(crate) dispatched_count: u64,
30 pub(crate) from_cache_count: u64,
32 pub(crate) sync_count: u64,
34 pub(crate) dispatched_missing_root_count: u64,
37}
38
39impl ValueEncoderStats {
40 pub(crate) fn extend(&mut self, other: &Self) {
42 self.storage_wait_time += other.storage_wait_time;
43 self.dispatched_count += other.dispatched_count;
44 self.from_cache_count += other.from_cache_count;
45 self.sync_count += other.sync_count;
46 self.dispatched_missing_root_count += other.dispatched_missing_root_count;
47 }
48}
49
50pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
52 Dispatched {
54 hashed_address: B256,
55 account: Account,
56 proof_result_rx:
60 Option<Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>>,
61 storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNodeV2>>>>,
63 stats: Rc<RefCell<ValueEncoderStats>>,
65 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
68 cached_storage_roots: Arc<DashMap<B256, B256>>,
70 },
71 FromCache { account: Account, root: B256 },
73 Sync {
75 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
77 hashed_address: B256,
78 account: Account,
79 cached_storage_roots: Arc<DashMap<B256, B256>>,
81 },
82}
83
84impl<TC, HC> Drop for AsyncAccountDeferredValueEncoder<TC, HC> {
85 fn drop(&mut self) {
86 let res = if let Self::Dispatched {
89 hashed_address,
90 proof_result_rx,
91 storage_proof_results,
92 stats,
93 ..
94 } = self
95 {
96 let Some(proof_result_rx) = proof_result_rx.take() else { return };
98
99 (|| -> Result<(), StateProofError> {
100 let rx = proof_result_rx?;
101
102 let wait_start = Instant::now();
103 let msg = rx.recv().map_err(|_| {
104 StateProofError::Database(DatabaseError::Other(format!(
105 "Storage proof channel closed for {hashed_address:?}",
106 )))
107 })?;
108 let result = msg.result?;
109
110 stats.borrow_mut().storage_wait_time += wait_start.elapsed();
111
112 storage_proof_results.borrow_mut().insert(*hashed_address, result.proof);
113 Ok(())
114 })()
115 } else {
116 return;
117 };
118
119 if let Err(err) = res {
120 tracing::error!(target: "trie::parallel", %err, "Failed to collect storage proof in deferred encoder drop");
121 }
122 }
123}
124
125impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
126where
127 TC: TrieStorageCursor,
128 HC: HashedStorageCursor<Value = alloy_primitives::U256>,
129{
130 fn encode(mut self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
131 let (account, root) = match &mut self {
132 Self::Dispatched {
133 hashed_address,
134 account,
135 proof_result_rx,
136 storage_proof_results,
137 stats,
138 storage_calculator,
139 cached_storage_roots,
140 } => {
141 let hashed_address = *hashed_address;
142 let account = *account;
143 let proof_result_rx = proof_result_rx
145 .take()
146 .expect("encode called on already-consumed Dispatched encoder");
147 let wait_start = Instant::now();
148 let result = proof_result_rx?
149 .recv()
150 .map_err(|_| {
151 StateProofError::Database(DatabaseError::Other(format!(
152 "Storage proof channel closed for {hashed_address:?}",
153 )))
154 })?
155 .result?;
156 stats.borrow_mut().storage_wait_time += wait_start.elapsed();
157
158 storage_proof_results.borrow_mut().insert(hashed_address, result.proof);
159
160 let root = match result.root {
161 Some(root) => root,
162 None => {
163 stats.borrow_mut().dispatched_missing_root_count += 1;
170
171 let mut calculator = storage_calculator.borrow_mut();
172 let root_node = calculator.storage_root_node(hashed_address)?;
173 let storage_root = calculator
174 .compute_root_hash(&[root_node])?
175 .expect("storage_root_node returns a node at empty path");
176
177 cached_storage_roots.insert(hashed_address, storage_root);
178 storage_root
179 }
180 };
181
182 (account, root)
183 }
184 Self::FromCache { account, root } => (*account, *root),
185 Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
186 let hashed_address = *hashed_address;
187 let account = *account;
188 let mut calculator = storage_calculator.borrow_mut();
189 let root_node = calculator.storage_root_node(hashed_address)?;
190 let storage_root = calculator
191 .compute_root_hash(&[root_node])?
192 .expect("storage_root_node returns a node at empty path");
193
194 cached_storage_roots.insert(hashed_address, storage_root);
195 (account, storage_root)
196 }
197 };
198
199 let account = account.into_trie_account(root);
200 account.encode(buf);
201 Ok(())
202 }
203}
204
205pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
214 dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
216 cached_storage_roots: Arc<DashMap<B256, B256>>,
219 storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNodeV2>>>>,
222 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
225 stats: Rc<RefCell<ValueEncoderStats>>,
227}
228
229impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
230 pub(crate) fn new(
238 dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
239 cached_storage_roots: Arc<DashMap<B256, B256>>,
240 storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
241 ) -> Self {
242 Self {
243 dispatched,
244 cached_storage_roots,
245 storage_proof_results: Default::default(),
246 storage_calculator,
247 stats: Default::default(),
248 }
249 }
250
251 pub(crate) fn finalize(
261 self,
262 ) -> Result<(B256Map<Vec<ProofTrieNodeV2>>, ValueEncoderStats), StateProofError> {
263 let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
264 .expect("no deferred encoders are still allocated")
265 .into_inner();
266
267 let mut stats = Rc::into_inner(self.stats)
268 .expect("no deferred encoders are still allocated")
269 .into_inner();
270
271 for (hashed_address, rx) in &self.dispatched {
274 let wait_start = Instant::now();
275 let result = rx
276 .recv()
277 .map_err(|_| {
278 StateProofError::Database(DatabaseError::Other(format!(
279 "Storage proof channel closed for {hashed_address:?}",
280 )))
281 })?
282 .result?;
283 stats.storage_wait_time += wait_start.elapsed();
284
285 storage_proof_results.insert(*hashed_address, result.proof);
286 }
287
288 Ok((storage_proof_results, stats))
289 }
290}
291
292impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
293where
294 TC: TrieStorageCursor,
295 HC: HashedStorageCursor<Value = alloy_primitives::U256>,
296{
297 type Value = Account;
298 type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
299
300 fn deferred_encoder(
301 &mut self,
302 hashed_address: B256,
303 account: Self::Value,
304 ) -> Self::DeferredEncoder {
305 if let Some(rx) = self.dispatched.remove(&hashed_address) {
308 self.stats.borrow_mut().dispatched_count += 1;
309 return AsyncAccountDeferredValueEncoder::Dispatched {
310 hashed_address,
311 account,
312 proof_result_rx: Some(Ok(rx)),
313 storage_proof_results: self.storage_proof_results.clone(),
314 stats: self.stats.clone(),
315 storage_calculator: self.storage_calculator.clone(),
316 cached_storage_roots: self.cached_storage_roots.clone(),
317 }
318 }
319
320 if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
325 self.stats.borrow_mut().from_cache_count += 1;
326 return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
327 }
328
329 self.stats.borrow_mut().sync_count += 1;
331 AsyncAccountDeferredValueEncoder::Sync {
332 storage_calculator: self.storage_calculator.clone(),
333 hashed_address,
334 account,
335 cached_storage_roots: self.cached_storage_roots.clone(),
336 }
337 }
338}