reth_trie_parallel/
value_encoder.rs

1use crate::proof_task::{
2    StorageProofInput, StorageProofResult, StorageProofResultMessage, StorageWorkerJob,
3};
4use alloy_primitives::{map::B256Map, B256};
5use alloy_rlp::Encodable;
6use core::cell::RefCell;
7use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
8use dashmap::DashMap;
9use reth_execution_errors::trie::StateProofError;
10use reth_primitives_traits::Account;
11use reth_storage_errors::db::DatabaseError;
12use reth_trie::{
13    proof_v2::{DeferredValueEncoder, LeafValueEncoder, Target},
14    ProofTrieNode,
15};
16use std::{rc::Rc, sync::Arc};
17
18/// Returned from [`AsyncAccountValueEncoder`], used to track an async storage root calculation.
19pub(crate) enum AsyncAccountDeferredValueEncoder {
20    Dispatched {
21        hashed_address: B256,
22        account: Account,
23        proof_result_rx: Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>,
24        // None if results shouldn't be retained for this dispatched proof.
25        storage_proof_results: Option<Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>>,
26    },
27    FromCache {
28        account: Account,
29        root: B256,
30    },
31}
32
33impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
34    fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
35        let (account, root) = match self {
36            Self::Dispatched {
37                hashed_address,
38                account,
39                proof_result_rx,
40                storage_proof_results,
41            } => {
42                let result = proof_result_rx?
43                    .recv()
44                    .map_err(|_| {
45                        StateProofError::Database(DatabaseError::Other(format!(
46                            "Storage proof channel closed for {hashed_address:?}",
47                        )))
48                    })?
49                    .result?;
50
51                let StorageProofResult::V2 { root: Some(root), proof } = result else {
52                    panic!("StorageProofResult is not V2 with root: {result:?}")
53                };
54
55                if let Some(storage_proof_results) = storage_proof_results.as_ref() {
56                    storage_proof_results.borrow_mut().insert(hashed_address, proof);
57                }
58
59                (account, root)
60            }
61            Self::FromCache { account, root } => (account, root),
62        };
63
64        let account = account.into_trie_account(root);
65        account.encode(buf);
66        Ok(())
67    }
68}
69
70/// Implements the [`LeafValueEncoder`] trait for accounts using a [`CrossbeamSender`] to dispatch
71/// and compute storage roots asynchronously. Can also accept a set of already dispatched account
72/// storage proofs, for cases where it's possible to determine some necessary accounts ahead of
73/// time.
74pub(crate) struct AsyncAccountValueEncoder {
75    storage_work_tx: CrossbeamSender<StorageWorkerJob>,
76    /// Storage proof jobs which were dispatched ahead of time.
77    dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
78    /// Storage roots which have already been computed. This can be used only if a storage proof
79    /// wasn't dispatched for an account, otherwise we must consume the proof result.
80    cached_storage_roots: Arc<DashMap<B256, B256>>,
81    /// Tracks storage proof results received from the storage workers. [`Rc`] + [`RefCell`] is
82    /// required because [`DeferredValueEncoder`] cannot have a lifetime.
83    storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
84}
85
86impl AsyncAccountValueEncoder {
87    /// Initializes a [`Self`] using a `ProofWorkerHandle` which will be used to calculate storage
88    /// roots asynchronously.
89    pub(crate) fn new(
90        storage_work_tx: CrossbeamSender<StorageWorkerJob>,
91        dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
92        cached_storage_roots: Arc<DashMap<B256, B256>>,
93    ) -> Self {
94        Self {
95            storage_work_tx,
96            dispatched,
97            cached_storage_roots,
98            storage_proof_results: Default::default(),
99        }
100    }
101
102    /// Consume [`Self`] and return all collected storage proofs which had been dispatched.
103    ///
104    /// # Panics
105    ///
106    /// This method panics if any deferred encoders produced by [`Self::deferred_encoder`] have not
107    /// been dropped.
108    pub(crate) fn into_storage_proofs(
109        self,
110    ) -> Result<B256Map<Vec<ProofTrieNode>>, StateProofError> {
111        let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
112            .expect("no deferred encoders are still allocated")
113            .into_inner();
114
115        // Any remaining dispatched proofs need to have their results collected
116        for (hashed_address, rx) in &self.dispatched {
117            let result = rx
118                .recv()
119                .map_err(|_| {
120                    StateProofError::Database(DatabaseError::Other(format!(
121                        "Storage proof channel closed for {hashed_address:?}",
122                    )))
123                })?
124                .result?;
125
126            let StorageProofResult::V2 { proof, .. } = result else {
127                panic!("StorageProofResult is not V2: {result:?}")
128            };
129
130            storage_proof_results.insert(*hashed_address, proof);
131        }
132
133        Ok(storage_proof_results)
134    }
135}
136
137impl LeafValueEncoder for AsyncAccountValueEncoder {
138    type Value = Account;
139    type DeferredEncoder = AsyncAccountDeferredValueEncoder;
140
141    fn deferred_encoder(
142        &mut self,
143        hashed_address: B256,
144        account: Self::Value,
145    ) -> Self::DeferredEncoder {
146        // If the proof job has already been dispatched for this account then it's not necessary to
147        // dispatch another.
148        if let Some(rx) = self.dispatched.remove(&hashed_address) {
149            return AsyncAccountDeferredValueEncoder::Dispatched {
150                hashed_address,
151                account,
152                proof_result_rx: Ok(rx),
153                storage_proof_results: Some(self.storage_proof_results.clone()),
154            }
155        }
156
157        // If the address didn't have a job dispatched for it then we can assume it has no targets,
158        // and we only need its root.
159
160        // If the root is already calculated then just use it directly
161        if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
162            return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
163        }
164
165        // Create a proof input which targets a bogus key, so that we calculate the root as a
166        // side-effect.
167        let input = StorageProofInput::new(hashed_address, vec![Target::new(B256::ZERO)]);
168        let (tx, rx) = crossbeam_channel::bounded(1);
169
170        let proof_result_rx = self
171            .storage_work_tx
172            .send(StorageWorkerJob::StorageProof { input, proof_result_sender: tx })
173            .map_err(|_| DatabaseError::Other("storage workers unavailable".to_string()))
174            .map(|_| rx);
175
176        AsyncAccountDeferredValueEncoder::Dispatched {
177            hashed_address,
178            account,
179            proof_result_rx,
180            storage_proof_results: None,
181        }
182    }
183}