reth_engine_tree/tree/payload_processor/
sparse_trie.rs1use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
4use alloy_primitives::B256;
5use rayon::iter::{ParallelBridge, ParallelIterator};
6use reth_trie::{updates::TrieUpdates, Nibbles};
7use reth_trie_parallel::root::ParallelStateRootError;
8use reth_trie_sparse::{
9 errors::{SparseStateTrieResult, SparseTrieErrorKind},
10 provider::{TrieNodeProvider, TrieNodeProviderFactory},
11 ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
12};
13use smallvec::SmallVec;
14use std::{
15 sync::mpsc,
16 time::{Duration, Instant},
17};
18use tracing::{debug, debug_span, instrument, trace};
19
20pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
22where
23 BPF: TrieNodeProviderFactory + Send + Sync,
24 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
25 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
26{
27 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
29 pub(super) trie: SparseStateTrie<A, S>,
31 pub(super) metrics: MultiProofTaskMetrics,
32 blinded_provider_factory: BPF,
34}
35
36impl<BPF, A, S> SparseTrieTask<BPF, A, S>
37where
38 BPF: TrieNodeProviderFactory + Send + Sync + Clone,
39 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
40 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
41 A: SparseTrieInterface + Send + Sync + Default,
42 S: SparseTrieInterface + Send + Sync + Default + Clone,
43{
44 pub(super) fn new_with_cleared_trie(
46 updates: mpsc::Receiver<SparseTrieUpdate>,
47 blinded_provider_factory: BPF,
48 metrics: MultiProofTaskMetrics,
49 sparse_state_trie: ClearedSparseStateTrie<A, S>,
50 ) -> Self {
51 Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
52 }
53
54 #[instrument(
65 level = "debug",
66 target = "engine::tree::payload_processor::sparse_trie",
67 skip_all
68 )]
69 pub(super) fn run(
70 mut self,
71 ) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
72 let result = self.run_inner();
74 (result, self.trie)
75 }
76
77 fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
81 let now = Instant::now();
82
83 let mut num_iterations = 0;
84
85 while let Ok(mut update) = self.updates.recv() {
86 num_iterations += 1;
87 let mut num_updates = 1;
88 let _enter =
89 debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
90 .entered();
91 while let Ok(next) = self.updates.try_recv() {
92 update.extend(next);
93 num_updates += 1;
94 }
95 drop(_enter);
96
97 debug!(
98 target: "engine::root",
99 num_updates,
100 account_proofs = update.multiproof.account_subtree.len(),
101 storage_proofs = update.multiproof.storages.len(),
102 "Updating sparse trie"
103 );
104
105 let elapsed =
106 update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
107 .map_err(|e| {
108 ParallelStateRootError::Other(format!(
109 "could not calculate state root: {e:?}"
110 ))
111 })?;
112 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
113 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
114 }
115
116 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
117
118 let start = Instant::now();
119 let (state_root, trie_updates) =
120 self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
121 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
122 })?;
123
124 self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
125 self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
126
127 Ok(StateRootComputeOutcome { state_root, trie_updates })
128 }
129}
130
131#[derive(Debug)]
134pub struct StateRootComputeOutcome {
135 pub state_root: B256,
137 pub trie_updates: TrieUpdates,
139}
140
141#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
143pub(crate) fn update_sparse_trie<BPF, A, S>(
144 trie: &mut SparseStateTrie<A, S>,
145 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
146 blinded_provider_factory: &BPF,
147) -> SparseStateTrieResult<Duration>
148where
149 BPF: TrieNodeProviderFactory + Send + Sync,
150 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
151 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
152 A: SparseTrieInterface + Send + Sync + Default,
153 S: SparseTrieInterface + Send + Sync + Default + Clone,
154{
155 trace!(target: "engine::root::sparse", "Updating sparse trie");
156 let started_at = Instant::now();
157
158 trie.reveal_decoded_multiproof(multiproof)?;
160 let reveal_multiproof_elapsed = started_at.elapsed();
161 trace!(
162 target: "engine::root::sparse",
163 ?reveal_multiproof_elapsed,
164 "Done revealing multiproof"
165 );
166
167 let span = tracing::Span::current();
169 let (tx, rx) = mpsc::channel();
170 state
171 .storages
172 .into_iter()
173 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
174 .par_bridge()
175 .map(|(address, storage, storage_trie)| {
176 let _enter =
177 debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
178 .entered();
179
180 trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
181 let storage_provider = blinded_provider_factory.storage_node_provider(address);
182 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
183
184 if storage.wiped {
185 trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
186 storage_trie.wipe()?;
187 }
188
189 let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
194
195 for (slot, value) in storage.storage {
196 let slot_nibbles = Nibbles::unpack(slot);
197
198 if value.is_zero() {
199 removed_slots.push(slot_nibbles);
200 continue;
201 }
202
203 trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
204 storage_trie.update_leaf(
205 slot_nibbles,
206 alloy_rlp::encode_fixed_size(&value).to_vec(),
207 &storage_provider,
208 )?;
209 }
210
211 for slot_nibbles in removed_slots {
212 trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
213 storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
214 }
215
216 storage_trie.root();
217
218 SparseStateTrieResult::Ok((address, storage_trie))
219 })
220 .for_each_init(
221 || tx.clone(),
222 |tx, result| {
223 let _ = tx.send(result);
224 },
225 );
226 drop(tx);
227
228 let mut removed_accounts = Vec::new();
233
234 let _enter =
236 tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
237 .entered();
238 for result in rx {
239 let (address, storage_trie) = result?;
240 trie.insert_storage_trie(address, storage_trie);
241
242 if let Some(account) = state.accounts.remove(&address) {
243 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
246 if !trie.update_account(
247 address,
248 account.unwrap_or_default(),
249 blinded_provider_factory,
250 )? {
251 removed_accounts.push(address);
252 }
253 } else if trie.is_account_revealed(address) {
254 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
256 if !trie.update_account_storage_root(address, blinded_provider_factory)? {
257 removed_accounts.push(address);
258 }
259 }
260 }
261
262 for (address, account) in state.accounts {
264 trace!(target: "engine::root::sparse", ?address, "Updating account");
265 if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
266 removed_accounts.push(address);
267 }
268 }
269
270 for address in removed_accounts {
272 trace!(target: "engine::root::sparse", ?address, "Removing account");
273 let nibbles = Nibbles::unpack(address);
274 trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
275 }
276
277 let elapsed_before = started_at.elapsed();
278 trace!(
279 target: "engine::root::sparse",
280 "Calculating subtries"
281 );
282 trie.calculate_subtries();
283
284 let elapsed = started_at.elapsed();
285 let below_level_elapsed = elapsed - elapsed_before;
286 trace!(
287 target: "engine::root::sparse",
288 ?below_level_elapsed,
289 "Intermediate nodes calculated"
290 );
291
292 Ok(elapsed)
293}