reth_engine_tree/tree/payload_processor/
sparse_trie.rs1use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
4use alloy_primitives::B256;
5use rayon::iter::{ParallelBridge, ParallelIterator};
6use reth_trie::{updates::TrieUpdates, Nibbles};
7use reth_trie_parallel::root::ParallelStateRootError;
8use reth_trie_sparse::{
9 errors::{SparseStateTrieResult, SparseTrieErrorKind},
10 provider::{TrieNodeProvider, TrieNodeProviderFactory},
11 ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
12};
13use smallvec::SmallVec;
14use std::{
15 sync::mpsc,
16 time::{Duration, Instant},
17};
18use tracing::{debug, trace, trace_span};
19
20pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
22where
23 BPF: TrieNodeProviderFactory + Send + Sync,
24 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
25 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
26{
27 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
29 pub(super) trie: SparseStateTrie<A, S>,
31 pub(super) metrics: MultiProofTaskMetrics,
32 blinded_provider_factory: BPF,
34}
35
36impl<BPF, A, S> SparseTrieTask<BPF, A, S>
37where
38 BPF: TrieNodeProviderFactory + Send + Sync + Clone,
39 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
40 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
41 A: SparseTrieInterface + Send + Sync + Default,
42 S: SparseTrieInterface + Send + Sync + Default,
43{
44 pub(super) fn new_with_cleared_trie(
46 updates: mpsc::Receiver<SparseTrieUpdate>,
47 blinded_provider_factory: BPF,
48 metrics: MultiProofTaskMetrics,
49 sparse_state_trie: ClearedSparseStateTrie<A, S>,
50 ) -> Self {
51 Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
52 }
53
54 pub(super) fn run(
65 mut self,
66 ) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
67 let result = self.run_inner();
69 (result, self.trie)
70 }
71
72 fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
76 let now = Instant::now();
77
78 let mut num_iterations = 0;
79
80 while let Ok(mut update) = self.updates.recv() {
81 num_iterations += 1;
82 let mut num_updates = 1;
83 while let Ok(next) = self.updates.try_recv() {
84 update.extend(next);
85 num_updates += 1;
86 }
87
88 debug!(
89 target: "engine::root",
90 num_updates,
91 account_proofs = update.multiproof.account_subtree.len(),
92 storage_proofs = update.multiproof.storages.len(),
93 "Updating sparse trie"
94 );
95
96 let elapsed =
97 update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
98 .map_err(|e| {
99 ParallelStateRootError::Other(format!(
100 "could not calculate state root: {e:?}"
101 ))
102 })?;
103 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
104 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
105 }
106
107 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
108
109 let start = Instant::now();
110 let (state_root, trie_updates) =
111 self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
112 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
113 })?;
114
115 self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
116 self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
117
118 Ok(StateRootComputeOutcome { state_root, trie_updates })
119 }
120}
121
122#[derive(Debug)]
125pub struct StateRootComputeOutcome {
126 pub state_root: B256,
128 pub trie_updates: TrieUpdates,
130}
131
132pub(crate) fn update_sparse_trie<BPF, A, S>(
134 trie: &mut SparseStateTrie<A, S>,
135 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
136 blinded_provider_factory: &BPF,
137) -> SparseStateTrieResult<Duration>
138where
139 BPF: TrieNodeProviderFactory + Send + Sync,
140 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
141 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
142 A: SparseTrieInterface + Send + Sync + Default,
143 S: SparseTrieInterface + Send + Sync + Default,
144{
145 trace!(target: "engine::root::sparse", "Updating sparse trie");
146 let started_at = Instant::now();
147
148 trie.reveal_decoded_multiproof(multiproof)?;
150 let reveal_multiproof_elapsed = started_at.elapsed();
151 trace!(
152 target: "engine::root::sparse",
153 ?reveal_multiproof_elapsed,
154 "Done revealing multiproof"
155 );
156
157 let (tx, rx) = mpsc::channel();
159 state
160 .storages
161 .into_iter()
162 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
163 .par_bridge()
164 .map(|(address, storage, storage_trie)| {
165 let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
166 let _enter = span.enter();
167 trace!(target: "engine::root::sparse", "Updating storage");
168 let storage_provider = blinded_provider_factory.storage_node_provider(address);
169 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
170
171 if storage.wiped {
172 trace!(target: "engine::root::sparse", "Wiping storage");
173 storage_trie.wipe()?;
174 }
175
176 let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
181
182 for (slot, value) in storage.storage {
183 let slot_nibbles = Nibbles::unpack(slot);
184
185 if value.is_zero() {
186 removed_slots.push(slot_nibbles);
187 continue;
188 }
189
190 trace!(target: "engine::root::sparse", ?slot_nibbles, "Updating storage slot");
191 storage_trie.update_leaf(
192 slot_nibbles,
193 alloy_rlp::encode_fixed_size(&value).to_vec(),
194 &storage_provider,
195 )?;
196 }
197
198 for slot_nibbles in removed_slots {
199 trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
200 storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
201 }
202
203 storage_trie.root();
204
205 SparseStateTrieResult::Ok((address, storage_trie))
206 })
207 .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
208 drop(tx);
209
210 let mut removed_accounts = Vec::new();
215
216 for result in rx {
218 let (address, storage_trie) = result?;
219 trie.insert_storage_trie(address, storage_trie);
220
221 if let Some(account) = state.accounts.remove(&address) {
222 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
225 if !trie.update_account(
226 address,
227 account.unwrap_or_default(),
228 blinded_provider_factory,
229 )? {
230 removed_accounts.push(address);
231 }
232 } else if trie.is_account_revealed(address) {
233 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
235 if !trie.update_account_storage_root(address, blinded_provider_factory)? {
236 removed_accounts.push(address);
237 }
238 }
239 }
240
241 for (address, account) in state.accounts {
243 trace!(target: "engine::root::sparse", ?address, "Updating account");
244 if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
245 removed_accounts.push(address);
246 }
247 }
248
249 for address in removed_accounts {
251 trace!(target: "trie::sparse", ?address, "Removing account");
252 let nibbles = Nibbles::unpack(address);
253 trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
254 }
255
256 let elapsed_before = started_at.elapsed();
257 trace!(
258 target: "engine::root::sparse",
259 "Calculating subtries"
260 );
261 trie.calculate_subtries();
262
263 let elapsed = started_at.elapsed();
264 let below_level_elapsed = elapsed - elapsed_before;
265 trace!(
266 target: "engine::root::sparse",
267 ?below_level_elapsed,
268 "Intermediate nodes calculated"
269 );
270
271 Ok(elapsed)
272}