Skip to main content

reth_trie_parallel/
value_encoder.rs

1use crate::proof_task::{StorageProofResult, StorageProofResultMessage};
2use alloy_primitives::{map::B256Map, B256};
3use alloy_rlp::Encodable;
4use core::cell::RefCell;
5use crossbeam_channel::Receiver as CrossbeamReceiver;
6use reth_execution_errors::trie::StateProofError;
7use reth_primitives_traits::{dashmap::DashMap, Account};
8use reth_storage_errors::db::DatabaseError;
9use reth_trie::{
10    hashed_cursor::HashedStorageCursor,
11    proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
12    trie_cursor::TrieStorageCursor,
13    ProofTrieNode,
14};
15use std::{
16    rc::Rc,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21/// Stats collected by [`AsyncAccountValueEncoder`] during proof computation.
22///
23/// Tracks time spent waiting for storage proofs and counts of each deferred encoder variant used.
24#[derive(Debug, Default, Clone, Copy)]
25pub(crate) struct ValueEncoderStats {
26    /// Accumulated time spent waiting for storage proof results from dispatched workers.
27    pub(crate) storage_wait_time: Duration,
28    /// Number of times the `Dispatched` variant was used (proof pre-dispatched to workers).
29    pub(crate) dispatched_count: u64,
30    /// Number of times the `FromCache` variant was used (storage root already cached).
31    pub(crate) from_cache_count: u64,
32    /// Number of times the `Sync` variant was used (synchronous computation).
33    pub(crate) sync_count: u64,
34    /// Number of times a dispatched storage proof had no root node and fell back to sync
35    /// computation.
36    pub(crate) dispatched_missing_root_count: u64,
37}
38
39impl ValueEncoderStats {
40    /// Extends this metrics by adding the values from another.
41    pub(crate) fn extend(&mut self, other: &Self) {
42        self.storage_wait_time += other.storage_wait_time;
43        self.dispatched_count += other.dispatched_count;
44        self.from_cache_count += other.from_cache_count;
45        self.sync_count += other.sync_count;
46        self.dispatched_missing_root_count += other.dispatched_missing_root_count;
47    }
48}
49
50/// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation.
51pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
52    /// A storage proof job was dispatched to the worker pool.
53    Dispatched {
54        hashed_address: B256,
55        account: Account,
56        /// The receiver for the storage proof result. This is an `Option` so that `encode` can
57        /// take ownership of the receiver, preventing the `Drop` impl from trying to receive on
58        /// it again.
59        proof_result_rx:
60            Option<Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>>,
61        /// Shared storage proof results.
62        storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
63        /// Shared stats for tracking wait time and counts.
64        stats: Rc<RefCell<ValueEncoderStats>>,
65        /// Shared storage proof calculator for synchronous fallback when dispatched proof has no
66        /// root.
67        storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
68        /// Cache to store computed storage roots for future reuse.
69        cached_storage_roots: Arc<DashMap<B256, B256>>,
70    },
71    /// The storage root was found in cache.
72    FromCache { account: Account, root: B256 },
73    /// Synchronous storage root computation.
74    Sync {
75        /// Shared storage proof calculator for computing storage roots.
76        storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
77        hashed_address: B256,
78        account: Account,
79        /// Cache to store computed storage roots for future reuse.
80        cached_storage_roots: Arc<DashMap<B256, B256>>,
81    },
82}
83
84impl<TC, HC> Drop for AsyncAccountDeferredValueEncoder<TC, HC> {
85    fn drop(&mut self) {
86        // If this is a Dispatched encoder that was never consumed via encode(), we need to
87        // receive the storage proof result to avoid losing it.
88        let res = if let Self::Dispatched {
89            hashed_address,
90            proof_result_rx,
91            storage_proof_results,
92            stats,
93            ..
94        } = self
95        {
96            // Take the receiver out - if it's None (already consumed by encode), nothing to do
97            let Some(proof_result_rx) = proof_result_rx.take() else { return };
98
99            (|| -> Result<(), StateProofError> {
100                let rx = proof_result_rx?;
101
102                let wait_start = Instant::now();
103                let msg = rx.recv().map_err(|_| {
104                    StateProofError::Database(DatabaseError::Other(format!(
105                        "Storage proof channel closed for {hashed_address:?}",
106                    )))
107                })?;
108                let result = msg.result?;
109
110                stats.borrow_mut().storage_wait_time += wait_start.elapsed();
111
112                let StorageProofResult::V2 { proof, .. } = result else {
113                    panic!("StorageProofResult is not V2: {result:?}")
114                };
115
116                storage_proof_results.borrow_mut().insert(*hashed_address, proof);
117                Ok(())
118            })()
119        } else {
120            return;
121        };
122
123        if let Err(err) = res {
124            tracing::error!(target: "trie::parallel", %err, "Failed to collect storage proof in deferred encoder drop");
125        }
126    }
127}
128
129impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
130where
131    TC: TrieStorageCursor,
132    HC: HashedStorageCursor<Value = alloy_primitives::U256>,
133{
134    fn encode(mut self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
135        let (account, root) = match &mut self {
136            Self::Dispatched {
137                hashed_address,
138                account,
139                proof_result_rx,
140                storage_proof_results,
141                stats,
142                storage_calculator,
143                cached_storage_roots,
144            } => {
145                let hashed_address = *hashed_address;
146                let account = *account;
147                // Take the receiver so Drop won't try to receive on it again
148                let proof_result_rx = proof_result_rx
149                    .take()
150                    .expect("encode called on already-consumed Dispatched encoder");
151                let wait_start = Instant::now();
152                let result = proof_result_rx?
153                    .recv()
154                    .map_err(|_| {
155                        StateProofError::Database(DatabaseError::Other(format!(
156                            "Storage proof channel closed for {hashed_address:?}",
157                        )))
158                    })?
159                    .result?;
160                stats.borrow_mut().storage_wait_time += wait_start.elapsed();
161
162                let StorageProofResult::V2 { root, proof } = result else {
163                    panic!("StorageProofResult is not V2: {result:?}")
164                };
165
166                storage_proof_results.borrow_mut().insert(hashed_address, proof);
167
168                let root = match root {
169                    Some(root) => root,
170                    None => {
171                        // In `compute_v2_account_multiproof` we ensure that all dispatched storage
172                        // proofs computations for which there is also an account proof will return
173                        // a root node, but it could happen randomly that an account which is not in
174                        // the account proof targets, but _is_ in storage proof targets, will need
175                        // to be encoded as part of general trie traversal, so we need to handle
176                        // that case here.
177                        stats.borrow_mut().dispatched_missing_root_count += 1;
178
179                        let mut calculator = storage_calculator.borrow_mut();
180                        let root_node = calculator.storage_root_node(hashed_address)?;
181                        let storage_root = calculator
182                            .compute_root_hash(&[root_node])?
183                            .expect("storage_root_node returns a node at empty path");
184
185                        cached_storage_roots.insert(hashed_address, storage_root);
186                        storage_root
187                    }
188                };
189
190                (account, root)
191            }
192            Self::FromCache { account, root } => (*account, *root),
193            Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
194                let hashed_address = *hashed_address;
195                let account = *account;
196                let mut calculator = storage_calculator.borrow_mut();
197                let root_node = calculator.storage_root_node(hashed_address)?;
198                let storage_root = calculator
199                    .compute_root_hash(&[root_node])?
200                    .expect("storage_root_node returns a node at empty path");
201
202                cached_storage_roots.insert(hashed_address, storage_root);
203                (account, storage_root)
204            }
205        };
206
207        let account = account.into_trie_account(root);
208        account.encode(buf);
209        Ok(())
210    }
211}
212
213/// Implements the [`LeafValueEncoder`] trait for accounts.
214///
215/// Accepts a set of pre-dispatched storage proof receivers for accounts whose storage roots are
216/// being computed asynchronously by worker threads.
217///
218/// For accounts without pre-dispatched proofs or cached roots, uses a shared
219/// [`StorageProofCalculator`] to compute storage roots synchronously, reusing cursors across
220/// multiple accounts.
221pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
222    /// Storage proof jobs which were dispatched ahead of time.
223    dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
224    /// Storage roots which have already been computed. This can be used only if a storage proof
225    /// wasn't dispatched for an account, otherwise we must consume the proof result.
226    cached_storage_roots: Arc<DashMap<B256, B256>>,
227    /// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is
228    /// required because [`DeferredValueEncoder`] cannot have a lifetime.
229    storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
230    /// Shared storage proof calculator for synchronous computation. Reuses cursors and internal
231    /// buffers across multiple storage root calculations.
232    storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
233    /// Shared stats for tracking wait time and variant counts.
234    stats: Rc<RefCell<ValueEncoderStats>>,
235}
236
237impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
238    /// Initializes a [`Self`] using a storage proof calculator which will be reused to calculate
239    /// storage roots synchronously.
240    ///
241    /// # Parameters
242    /// - `dispatched`: Pre-dispatched storage proof receivers for target accounts
243    /// - `cached_storage_roots`: Shared cache of already-computed storage roots
244    /// - `storage_calculator`: Shared storage proof calculator for synchronous computation
245    pub(crate) fn new(
246        dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
247        cached_storage_roots: Arc<DashMap<B256, B256>>,
248        storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
249    ) -> Self {
250        Self {
251            dispatched,
252            cached_storage_roots,
253            storage_proof_results: Default::default(),
254            storage_calculator,
255            stats: Default::default(),
256        }
257    }
258
259    /// Consume [`Self`] and return all collected storage proofs along with accumulated stats.
260    ///
261    /// This method collects any remaining dispatched proofs that weren't consumed during proof
262    /// calculation and includes their wait time in the returned stats.
263    ///
264    /// # Panics
265    ///
266    /// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
267    /// been dropped.
268    pub(crate) fn finalize(
269        self,
270    ) -> Result<(B256Map<Vec<ProofTrieNode>>, ValueEncoderStats), StateProofError> {
271        let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
272            .expect("no deferred encoders are still allocated")
273            .into_inner();
274
275        let mut stats = Rc::into_inner(self.stats)
276            .expect("no deferred encoders are still allocated")
277            .into_inner();
278
279        // Any remaining dispatched proofs need to have their results collected.
280        // These are proofs that were pre-dispatched but not consumed during proof calculation.
281        for (hashed_address, rx) in &self.dispatched {
282            let wait_start = Instant::now();
283            let result = rx
284                .recv()
285                .map_err(|_| {
286                    StateProofError::Database(DatabaseError::Other(format!(
287                        "Storage proof channel closed for {hashed_address:?}",
288                    )))
289                })?
290                .result?;
291            stats.storage_wait_time += wait_start.elapsed();
292
293            let StorageProofResult::V2 { proof, .. } = result else {
294                panic!("StorageProofResult is not V2: {result:?}")
295            };
296
297            storage_proof_results.insert(*hashed_address, proof);
298        }
299
300        Ok((storage_proof_results, stats))
301    }
302}
303
304impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
305where
306    TC: TrieStorageCursor,
307    HC: HashedStorageCursor<Value = alloy_primitives::U256>,
308{
309    type Value = Account;
310    type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
311
312    fn deferred_encoder(
313        &mut self,
314        hashed_address: B256,
315        account: Self::Value,
316    ) -> Self::DeferredEncoder {
317        // If the proof job has already been dispatched for this account then it's not necessary to
318        // dispatch another.
319        if let Some(rx) = self.dispatched.remove(&hashed_address) {
320            self.stats.borrow_mut().dispatched_count += 1;
321            return AsyncAccountDeferredValueEncoder::Dispatched {
322                hashed_address,
323                account,
324                proof_result_rx: Some(Ok(rx)),
325                storage_proof_results: self.storage_proof_results.clone(),
326                stats: self.stats.clone(),
327                storage_calculator: self.storage_calculator.clone(),
328                cached_storage_roots: self.cached_storage_roots.clone(),
329            }
330        }
331
332        // If the address didn't have a job dispatched for it then we can assume it has no targets,
333        // and we only need its root.
334
335        // If the root is already calculated then just use it directly
336        if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
337            self.stats.borrow_mut().from_cache_count += 1;
338            return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
339        }
340
341        // Compute storage root synchronously using the shared calculator
342        self.stats.borrow_mut().sync_count += 1;
343        AsyncAccountDeferredValueEncoder::Sync {
344            storage_calculator: self.storage_calculator.clone(),
345            hashed_address,
346            account,
347            cached_storage_roots: self.cached_storage_roots.clone(),
348        }
349    }
350}