reth_trie_parallel/
value_encoder.rs1use crate::proof_task::{
2 StorageProofInput, StorageProofResult, StorageProofResultMessage, StorageWorkerJob,
3};
4use alloy_primitives::{map::B256Map, B256};
5use alloy_rlp::Encodable;
6use core::cell::RefCell;
7use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
8use dashmap::DashMap;
9use reth_execution_errors::trie::StateProofError;
10use reth_primitives_traits::Account;
11use reth_storage_errors::db::DatabaseError;
12use reth_trie::{
13 proof_v2::{DeferredValueEncoder, LeafValueEncoder, Target},
14 ProofTrieNode,
15};
16use std::{rc::Rc, sync::Arc};
17
18pub(crate) enum AsyncAccountDeferredValueEncoder {
20 Dispatched {
21 hashed_address: B256,
22 account: Account,
23 proof_result_rx: Result<CrossbeamReceiver<StorageProofResultMessage>, DatabaseError>,
24 storage_proof_results: Option<Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>>,
26 },
27 FromCache {
28 account: Account,
29 root: B256,
30 },
31}
32
33impl DeferredValueEncoder for AsyncAccountDeferredValueEncoder {
34 fn encode(self, buf: &mut Vec<u8>) -> Result<(), StateProofError> {
35 let (account, root) = match self {
36 Self::Dispatched {
37 hashed_address,
38 account,
39 proof_result_rx,
40 storage_proof_results,
41 } => {
42 let result = proof_result_rx?
43 .recv()
44 .map_err(|_| {
45 StateProofError::Database(DatabaseError::Other(format!(
46 "Storage proof channel closed for {hashed_address:?}",
47 )))
48 })?
49 .result?;
50
51 let StorageProofResult::V2 { root: Some(root), proof } = result else {
52 panic!("StorageProofResult is not V2 with root: {result:?}")
53 };
54
55 if let Some(storage_proof_results) = storage_proof_results.as_ref() {
56 storage_proof_results.borrow_mut().insert(hashed_address, proof);
57 }
58
59 (account, root)
60 }
61 Self::FromCache { account, root } => (account, root),
62 };
63
64 let account = account.into_trie_account(root);
65 account.encode(buf);
66 Ok(())
67 }
68}
69
70pub(crate) struct AsyncAccountValueEncoder {
75 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
76 dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
78 cached_storage_roots: Arc<DashMap<B256, B256>>,
81 storage_proof_results: Rc<RefCell<B256Map<Vec<ProofTrieNode>>>>,
84}
85
86impl AsyncAccountValueEncoder {
87 pub(crate) fn new(
90 storage_work_tx: CrossbeamSender<StorageWorkerJob>,
91 dispatched: B256Map<CrossbeamReceiver<StorageProofResultMessage>>,
92 cached_storage_roots: Arc<DashMap<B256, B256>>,
93 ) -> Self {
94 Self {
95 storage_work_tx,
96 dispatched,
97 cached_storage_roots,
98 storage_proof_results: Default::default(),
99 }
100 }
101
102 pub(crate) fn into_storage_proofs(
109 self,
110 ) -> Result<B256Map<Vec<ProofTrieNode>>, StateProofError> {
111 let mut storage_proof_results = Rc::into_inner(self.storage_proof_results)
112 .expect("no deferred encoders are still allocated")
113 .into_inner();
114
115 for (hashed_address, rx) in &self.dispatched {
117 let result = rx
118 .recv()
119 .map_err(|_| {
120 StateProofError::Database(DatabaseError::Other(format!(
121 "Storage proof channel closed for {hashed_address:?}",
122 )))
123 })?
124 .result?;
125
126 let StorageProofResult::V2 { proof, .. } = result else {
127 panic!("StorageProofResult is not V2: {result:?}")
128 };
129
130 storage_proof_results.insert(*hashed_address, proof);
131 }
132
133 Ok(storage_proof_results)
134 }
135}
136
137impl LeafValueEncoder for AsyncAccountValueEncoder {
138 type Value = Account;
139 type DeferredEncoder = AsyncAccountDeferredValueEncoder;
140
141 fn deferred_encoder(
142 &mut self,
143 hashed_address: B256,
144 account: Self::Value,
145 ) -> Self::DeferredEncoder {
146 if let Some(rx) = self.dispatched.remove(&hashed_address) {
149 return AsyncAccountDeferredValueEncoder::Dispatched {
150 hashed_address,
151 account,
152 proof_result_rx: Ok(rx),
153 storage_proof_results: Some(self.storage_proof_results.clone()),
154 }
155 }
156
157 if let Some(root) = self.cached_storage_roots.get(&hashed_address) {
162 return AsyncAccountDeferredValueEncoder::FromCache { account, root: *root }
163 }
164
165 let input = StorageProofInput::new(hashed_address, vec![Target::new(B256::ZERO)]);
168 let (tx, rx) = crossbeam_channel::bounded(1);
169
170 let proof_result_rx = self
171 .storage_work_tx
172 .send(StorageWorkerJob::StorageProof { input, proof_result_sender: tx })
173 .map_err(|_| DatabaseError::Other("storage workers unavailable".to_string()))
174 .map(|_| rx);
175
176 AsyncAccountDeferredValueEncoder::Dispatched {
177 hashed_address,
178 account,
179 proof_result_rx,
180 storage_proof_results: None,
181 }
182 }
183}