Skip to main content

reth_trie_parallel/
value_encoder.rs

1use crate::proof_task::StorageProofResultMessage;
2use alloy_primitives::{map::B256Map, B256};
3use alloy_rlp::Encodable;
4use core::cell::RefCell;
5use crossbeam_channel::Receiver as CrossbeamReceiver;
6use reth_execution_errors::trie::StateProofError;
7use reth_primitives_traits::{dashmap::DashMap, Account};
8use reth_storage_errors::db::DatabaseError;
9use reth_trie::{
10    hashed_cursor::HashedStorageCursor,
11    proof_v2::{DeferredValueEncoder, LeafValueEncoder, StorageProofCalculator},
12    trie_cursor::TrieStorageCursor,
13    ProofTrieNodeV2,
14};
15use std::{
16    rc::Rc,
17    sync::Arc,
18    time::{Duration, Instant},
19};
20
21/// Stats collected by [`AsyncAccountValueEncoder`] during proof computation.
22///
23/// Tracks time spent waiting for storage proofs and counts of each deferred encoder variant used.
24#[derive(Debug, Default, Clone, Copy)]
25pub(crate) struct ValueEncoderStats {
26    /// Accumulated time spent waiting for storage proof results from dispatched workers.
27    pub(crate) storage_wait_time: Duration,
28    /// Number of times the `Dispatched` variant was used (proof pre-dispatched to workers).
29    pub(crate) dispatched_count: u64,
30    /// Number of times the `FromCache` variant was used (storage root already cached).
31    pub(crate) from_cache_count: u64,
32    /// Number of times the `Sync` variant was used (synchronous computation).
33    pub(crate) sync_count: u64,
34    /// Number of times a dispatched storage proof had no root node and fell back to sync
35    /// computation.
36    pub(crate) dispatched_missing_root_count: u64,
37}
38
39impl ValueEncoderStats {
40    /// Extends this metrics by adding the values from another.
41    pub(crate) fn extend(&mut self, other: &Self) {
42        self.storage_wait_time += other.storage_wait_time;
43        self.dispatched_count += other.dispatched_count;
44        self.from_cache_count += other.from_cache_count;
45        self.sync_count += other.sync_count;
46        self.dispatched_missing_root_count += other.dispatched_missing_root_count;
47    }
48}
49
50/// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation.
51pub(crate) enum AsyncAccountDeferredValueEncoder<TC, HC> {
52    /// A storage proof job was dispatched to the worker pool.
53    Dispatched {
54        hashed_address: B256,
55        account: Account,
56        /// The receiver for the storage proof result. This is an `Option` so that `encode` can
57        /// take ownership of the receiver, preventing the `Drop` impl from trying to receive on
58        /// it again.
59        proof_result_rx:
60            Option<Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>>,
61        /// Shared storage proof results.
62        storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNodeV2>>>>,
63        /// Shared stats for tracking wait time and counts.
64        stats: Rc<RefCell<ValueEncoderStats>>,
65        /// Shared storage proof calculator for synchronous fallback when dispatched proof has no
66        /// root.
67        storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
68        /// Cache to store computed storage roots for future reuse.
69        cached_storage_roots: Arc<DashMap<B256, B256>>,
70    },
71    /// The storage root was found in cache.
72    FromCache { account: Account, root: B256 },
73    /// Synchronous storage root computation.
74    Sync {
75        /// Shared storage proof calculator for computing storage roots.
76        storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
77        hashed_address: B256,
78        account: Account,
79        /// Cache to store computed storage roots for future reuse.
80        cached_storage_roots: Arc<DashMap<B256, B256>>,
81    },
82}
83
84impl<TC, HC> Drop for AsyncAccountDeferredValueEncoder<TC, HC> {
85    fn drop(&mut self) {
86        // If this is a Dispatched encoder that was never consumed via encode(), we need to
87        // receive the storage proof result to avoid losing it.
88        let res = if let Self::Dispatched {
89            hashed_address,
90            proof_result_rx,
91            storage_proof_results,
92            stats,
93            ..
94        } = self
95        {
96            // Take the receiver out - if it's None (already consumed by encode), nothing to do
97            let Some(proof_result_rx) = proof_result_rx.take() else { return };
98
99            (|| -> Result<(), StateProofError> {
100                let rx = proof_result_rx?;
101
102                let wait_start = Instant::now();
103                let msg = rx.recv().map_err(|_| {
104                    StateProofError::Database(DatabaseError::Other(format!(
105                        "Storage proof channel closed for {hashed_address:?}",
106                    )))
107                })?;
108                let result = msg.result?;
109
110                stats.borrow_mut().storage_wait_time += wait_start.elapsed();
111
112                storage_proof_results.borrow_mut().insert(*hashed_address, result.proof);
113                Ok(())
114            })()
115        } else {
116            return;
117        };
118
119        if let Err(err) = res {
120            tracing::error!(target: "trie::parallel", %err, "Failed to collect storage proof in deferred encoder drop");
121        }
122    }
123}
124
125impl<TC, HC> DeferredValueEncoder for AsyncAccountDeferredValueEncoder<TC, HC>
126where
127    TC: TrieStorageCursor,
128    HC: HashedStorageCursor<Value = alloy_primitives::U256>,
129{
130    fn encode(mut self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
131        let (account, root) = match &mut self {
132            Self::Dispatched {
133                hashed_address,
134                account,
135                proof_result_rx,
136                storage_proof_results,
137                stats,
138                storage_calculator,
139                cached_storage_roots,
140            } => {
141                let hashed_address = *hashed_address;
142                let account = *account;
143                // Take the receiver so Drop won't try to receive on it again
144                let proof_result_rx = proof_result_rx
145                    .take()
146                    .expect("encode called on already-consumed Dispatched encoder");
147                let wait_start = Instant::now();
148                let result = proof_result_rx?
149                    .recv()
150                    .map_err(|_| {
151                        StateProofError::Database(DatabaseError::Other(format!(
152                            "Storage proof channel closed for {hashed_address:?}",
153                        )))
154                    })?
155                    .result?;
156                stats.borrow_mut().storage_wait_time += wait_start.elapsed();
157
158                storage_proof_results.borrow_mut().insert(hashed_address, result.proof);
159
160                let root = match result.root {
161                    Some(root) => root,
162                    None => {
163                        // In `compute_v2_account_multiproof` we ensure that all dispatched storage
164                        // proofs computations for which there is also an account proof will return
165                        // a root node, but it could happen randomly that an account which is not in
166                        // the account proof targets, but _is_ in storage proof targets, will need
167                        // to be encoded as part of general trie traversal, so we need to handle
168                        // that case here.
169                        stats.borrow_mut().dispatched_missing_root_count += 1;
170
171                        let mut calculator = storage_calculator.borrow_mut();
172                        let root_node = calculator.storage_root_node(hashed_address)?;
173                        let storage_root = calculator
174                            .compute_root_hash(&[root_node])?
175                            .expect("storage_root_node returns a node at empty path");
176
177                        cached_storage_roots.insert(hashed_address, storage_root);
178                        storage_root
179                    }
180                };
181
182                (account, root)
183            }
184            Self::FromCache { account, root } => (*account, *root),
185            Self::Sync { storage_calculator, hashed_address, account, cached_storage_roots } => {
186                let hashed_address = *hashed_address;
187                let account = *account;
188                let mut calculator = storage_calculator.borrow_mut();
189                let root_node = calculator.storage_root_node(hashed_address)?;
190                let storage_root = calculator
191                    .compute_root_hash(&[root_node])?
192                    .expect("storage_root_node returns a node at empty path");
193
194                cached_storage_roots.insert(hashed_address, storage_root);
195                (account, storage_root)
196            }
197        };
198
199        let account = account.into_trie_account(root);
200        account.encode(buf);
201        Ok(())
202    }
203}
204
205/// Implements the [`LeafValueEncoder`] trait for accounts.
206///
207/// Accepts a set of pre-dispatched storage proof receivers for accounts whose storage roots are
208/// being computed asynchronously by worker threads.
209///
210/// For accounts without pre-dispatched proofs or cached roots, uses a shared
211/// [`StorageProofCalculator`] to compute storage roots synchronously, reusing cursors across
212/// multiple accounts.
213pub(crate) struct AsyncAccountValueEncoder<TC, HC> {
214    /// Storage proof jobs which were dispatched ahead of time.
215    dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
216    /// Storage roots which have already been computed. This can be used only if a storage proof
217    /// wasn't dispatched for an account, otherwise we must consume the proof result.
218    cached_storage_roots: Arc<DashMap<B256, B256>>,
219    /// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is
220    /// required because [`DeferredValueEncoder`] cannot have a lifetime.
221    storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNodeV2>>>>,
222    /// Shared storage proof calculator for synchronous computation. Reuses cursors and internal
223    /// buffers across multiple storage root calculations.
224    storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
225    /// Shared stats for tracking wait time and variant counts.
226    stats: Rc<RefCell<ValueEncoderStats>>,
227}
228
229impl<TC, HC> AsyncAccountValueEncoder<TC, HC> {
230    /// Initializes a [`Self`] using a storage proof calculator which will be reused to calculate
231    /// storage roots synchronously.
232    ///
233    /// # Parameters
234    /// - `dispatched`: Pre-dispatched storage proof receivers for target accounts
235    /// - `cached_storage_roots`: Shared cache of already-computed storage roots
236    /// - `storage_calculator`: Shared storage proof calculator for synchronous computation
237    pub(crate) fn new(
238        dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
239        cached_storage_roots: Arc<DashMap<B256, B256>>,
240        storage_calculator: Rc<RefCell<StorageProofCalculator<TC, HC>>>,
241    ) -> Self {
242        Self {
243            dispatched,
244            cached_storage_roots,
245            storage_proof_results: Default::default(),
246            storage_calculator,
247            stats: Default::default(),
248        }
249    }
250
251    /// Consume [`Self`] and return all collected storage proofs along with accumulated stats.
252    ///
253    /// This method collects any remaining dispatched proofs that weren't consumed during proof
254    /// calculation and includes their wait time in the returned stats.
255    ///
256    /// # Panics
257    ///
258    /// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
259    /// been dropped.
260    pub(crate) fn finalize(
261        self,
262    ) -> Result<(B256Map<Vec<ProofTrieNodeV2>>, ValueEncoderStats), StateProofError> {
263        let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
264            .expect("no deferred encoders are still allocated")
265            .into_inner();
266
267        let mut stats = Rc::into_inner(self.stats)
268            .expect("no deferred encoders are still allocated")
269            .into_inner();
270
271        // Any remaining dispatched proofs need to have their results collected.
272        // These are proofs that were pre-dispatched but not consumed during proof calculation.
273        for (hashed_address, rx) in &self.dispatched {
274            let wait_start = Instant::now();
275            let result = rx
276                .recv()
277                .map_err(|_| {
278                    StateProofError::Database(DatabaseError::Other(format!(
279                        "Storage proof channel closed for {hashed_address:?}",
280                    )))
281                })?
282                .result?;
283            stats.storage_wait_time += wait_start.elapsed();
284
285            storage_proof_results.insert(*hashed_address, result.proof);
286        }
287
288        Ok((storage_proof_results, stats))
289    }
290}
291
292impl<TC, HC> LeafValueEncoder for AsyncAccountValueEncoder<TC, HC>
293where
294    TC: TrieStorageCursor,
295    HC: HashedStorageCursor<Value = alloy_primitives::U256>,
296{
297    type Value = Account;
298    type DeferredEncoder = AsyncAccountDeferredValueEncoder<TC, HC>;
299
300    fn deferred_encoder(
301        &mut self,
302        hashed_address: B256,
303        account: Self::Value,
304    ) -> Self::DeferredEncoder {
305        // If the proof job has already been dispatched for this account then it's not necessary to
306        // dispatch another.
307        if let Some(rx) = self.dispatched.remove(&hashed_address) {
308            self.stats.borrow_mut().dispatched_count += 1;
309            return AsyncAccountDeferredValueEncoder::Dispatched {
310                hashed_address,
311                account,
312                proof_result_rx: Some(Ok(rx)),
313                storage_proof_results: self.storage_proof_results.clone(),
314                stats: self.stats.clone(),
315                storage_calculator: self.storage_calculator.clone(),
316                cached_storage_roots: self.cached_storage_roots.clone(),
317            }
318        }
319
320        // If the address didn't have a job dispatched for it then we can assume it has no targets,
321        // and we only need its root.
322
323        // If the root is already calculated then just use it directly
324        if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
325            self.stats.borrow_mut().from_cache_count += 1;
326            return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
327        }
328
329        // Compute storage root synchronously using the shared calculator
330        self.stats.borrow_mut().sync_count += 1;
331        AsyncAccountDeferredValueEncoder::Sync {
332            storage_calculator: self.storage_calculator.clone(),
333            hashed_address,
334            account,
335            cached_storage_roots: self.cached_storage_roots.clone(),
336        }
337    }
338}