reth_engine_tree/tree/payload_processor/
sparse_trie.rs1use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
4use alloy_primitives::B256;
5use rayon::iter::{ParallelBridge, ParallelIterator};
6use reth_trie::{updates::TrieUpdates, Nibbles};
7use reth_trie_parallel::root::ParallelStateRootError;
8use reth_trie_sparse::{
9 errors::{SparseStateTrieResult, SparseTrieErrorKind},
10 provider::{TrieNodeProvider, TrieNodeProviderFactory},
11 ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
12};
13use smallvec::SmallVec;
14use std::{
15 sync::mpsc,
16 time::{Duration, Instant},
17};
18use tracing::{debug, debug_span, instrument, trace};
19
20pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
22where
23 BPF: TrieNodeProviderFactory + Send + Sync,
24 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
25 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
26{
27 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
29 pub(super) trie: SparseStateTrie<A, S>,
31 pub(super) metrics: MultiProofTaskMetrics,
32 blinded_provider_factory: BPF,
34}
35
36impl<BPF, A, S> SparseTrieTask<BPF, A, S>
37where
38 BPF: TrieNodeProviderFactory + Send + Sync + Clone,
39 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
40 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
41 A: SparseTrieInterface + Send + Sync + Default,
42 S: SparseTrieInterface + Send + Sync + Default + Clone,
43{
44 pub(super) fn new_with_cleared_trie(
46 updates: mpsc::Receiver<SparseTrieUpdate>,
47 blinded_provider_factory: BPF,
48 metrics: MultiProofTaskMetrics,
49 sparse_state_trie: ClearedSparseStateTrie<A, S>,
50 ) -> Self {
51 Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
52 }
53
54 #[instrument(
65 level = "debug",
66 target = "engine::tree::payload_processor::sparse_trie",
67 skip_all
68 )]
69 pub(super) fn run(
70 mut self,
71 ) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
72 let result = self.run_inner();
74 (result, self.trie)
75 }
76
77 fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
81 let now = Instant::now();
82
83 let mut num_iterations = 0;
84
85 while let Ok(mut update) = self.updates.recv() {
86 num_iterations += 1;
87 let mut num_updates = 1;
88 let _enter =
89 debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
90 .entered();
91 while let Ok(next) = self.updates.try_recv() {
92 update.extend(next);
93 num_updates += 1;
94 }
95 drop(_enter);
96
97 debug!(
98 target: "engine::root",
99 num_updates,
100 account_proofs = update.multiproof.account_subtree.len(),
101 storage_proofs = update.multiproof.storages.len(),
102 "Updating sparse trie"
103 );
104
105 let elapsed =
106 update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
107 .map_err(|e| {
108 ParallelStateRootError::Other(format!(
109 "could not calculate state root: {e:?}"
110 ))
111 })?;
112 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
113 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
114 }
115
116 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
117
118 let start = Instant::now();
119 let (state_root, trie_updates) =
120 self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
121 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
122 })?;
123
124 self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
125 self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
126
127 Ok(StateRootComputeOutcome { state_root, trie_updates })
128 }
129}
130
131#[derive(Debug)]
134pub struct StateRootComputeOutcome {
135 pub state_root: B256,
137 pub trie_updates: TrieUpdates,
139}
140
141#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
143pub(crate) fn update_sparse_trie<BPF, A, S>(
144 trie: &mut SparseStateTrie<A, S>,
145 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
146 blinded_provider_factory: &BPF,
147) -> SparseStateTrieResult<Duration>
148where
149 BPF: TrieNodeProviderFactory + Send + Sync,
150 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
151 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
152 A: SparseTrieInterface + Send + Sync + Default,
153 S: SparseTrieInterface + Send + Sync + Default + Clone,
154{
155 trace!(target: "engine::root::sparse", "Updating sparse trie");
156 let started_at = Instant::now();
157
158 trie.reveal_decoded_multiproof(multiproof)?;
160 let reveal_multiproof_elapsed = started_at.elapsed();
161 trace!(
162 target: "engine::root::sparse",
163 ?reveal_multiproof_elapsed,
164 "Done revealing multiproof"
165 );
166
167 let span = tracing::Span::current();
169 let results: Vec<_> = state
170 .storages
171 .into_iter()
172 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
173 .par_bridge()
174 .map(|(address, storage, storage_trie)| {
175 let _enter =
176 debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
177 .entered();
178
179 trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
180 let storage_provider = blinded_provider_factory.storage_node_provider(address);
181 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
182
183 if storage.wiped {
184 trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
185 storage_trie.wipe()?;
186 }
187
188 let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
193
194 for (slot, value) in storage.storage {
195 let slot_nibbles = Nibbles::unpack(slot);
196
197 if value.is_zero() {
198 removed_slots.push(slot_nibbles);
199 continue;
200 }
201
202 trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
203 storage_trie.update_leaf(
204 slot_nibbles,
205 alloy_rlp::encode_fixed_size(&value).to_vec(),
206 &storage_provider,
207 )?;
208 }
209
210 for slot_nibbles in removed_slots {
211 trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
212 storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
213 }
214
215 storage_trie.root();
216
217 SparseStateTrieResult::Ok((address, storage_trie))
218 })
219 .collect();
220
221 let mut removed_accounts = Vec::new();
226
227 let _enter =
229 tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
230 .entered();
231 for result in results {
232 let (address, storage_trie) = result?;
233 trie.insert_storage_trie(address, storage_trie);
234
235 if let Some(account) = state.accounts.remove(&address) {
236 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
239 if !trie.update_account(
240 address,
241 account.unwrap_or_default(),
242 blinded_provider_factory,
243 )? {
244 removed_accounts.push(address);
245 }
246 } else if trie.is_account_revealed(address) {
247 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
249 if !trie.update_account_storage_root(address, blinded_provider_factory)? {
250 removed_accounts.push(address);
251 }
252 }
253 }
254
255 for (address, account) in state.accounts {
257 trace!(target: "engine::root::sparse", ?address, "Updating account");
258 if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
259 removed_accounts.push(address);
260 }
261 }
262
263 for address in removed_accounts {
265 trace!(target: "engine::root::sparse", ?address, "Removing account");
266 let nibbles = Nibbles::unpack(address);
267 trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
268 }
269
270 let elapsed_before = started_at.elapsed();
271 trace!(
272 target: "engine::root::sparse",
273 "Calculating subtries"
274 );
275 trie.calculate_subtries();
276
277 let elapsed = started_at.elapsed();
278 let below_level_elapsed = elapsed - elapsed_before;
279 trace!(
280 target: "engine::root::sparse",
281 ?below_level_elapsed,
282 "Intermediate nodes calculated"
283 );
284
285 Ok(elapsed)
286}