reth_engine_tree/tree/payload_processor/
sparse_trie.rs
1use crate::tree::payload_processor::{
4 executor::WorkloadExecutor,
5 multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
6};
7use alloy_primitives::B256;
8use rayon::iter::{ParallelBridge, ParallelIterator};
9use reth_trie::{updates::TrieUpdates, Nibbles};
10use reth_trie_parallel::root::ParallelStateRootError;
11use reth_trie_sparse::{
12 blinded::{BlindedProvider, BlindedProviderFactory},
13 errors::{SparseStateTrieResult, SparseTrieErrorKind},
14 SparseStateTrie,
15};
16use std::{
17 sync::mpsc,
18 time::{Duration, Instant},
19};
20use tracing::{debug, trace, trace_span};
21
22const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
25
26pub(super) struct SparseTrieTask<BPF> {
28 #[expect(unused)] pub(super) executor: WorkloadExecutor,
31 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
33 pub(super) blinded_provider_factory: BPF,
34 pub(super) metrics: MultiProofTaskMetrics,
35}
36
37impl<BPF> SparseTrieTask<BPF>
38where
39 BPF: BlindedProviderFactory + Send + Sync,
40 BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
41 BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
42{
43 pub(super) const fn new(
45 executor: WorkloadExecutor,
46 updates: mpsc::Receiver<SparseTrieUpdate>,
47 blinded_provider_factory: BPF,
48 metrics: MultiProofTaskMetrics,
49 ) -> Self {
50 Self { executor, updates, blinded_provider_factory, metrics }
51 }
52
53 pub(super) fn run(&self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
62 let now = Instant::now();
63
64 let mut num_iterations = 0;
65 let mut trie = SparseStateTrie::new(&self.blinded_provider_factory).with_updates(true);
66
67 while let Ok(mut update) = self.updates.recv() {
68 num_iterations += 1;
69 let mut num_updates = 1;
70 while let Ok(next) = self.updates.try_recv() {
71 update.extend(next);
72 num_updates += 1;
73 }
74
75 debug!(
76 target: "engine::root",
77 num_updates,
78 account_proofs = update.multiproof.account_subtree.len(),
79 storage_proofs = update.multiproof.storages.len(),
80 "Updating sparse trie"
81 );
82
83 let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| {
84 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
85 })?;
86 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
87 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
88 }
89
90 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
91
92 let start = Instant::now();
93 let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| {
94 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
95 })?;
96
97 self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
98 self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
99
100 Ok(StateRootComputeOutcome { state_root, trie_updates })
101 }
102}
103
104#[derive(Debug)]
107pub struct StateRootComputeOutcome {
108 pub state_root: B256,
110 pub trie_updates: TrieUpdates,
112}
113
114pub(crate) fn update_sparse_trie<BPF>(
116 trie: &mut SparseStateTrie<BPF>,
117 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
118) -> SparseStateTrieResult<Duration>
119where
120 BPF: BlindedProviderFactory + Send + Sync,
121 BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
122 BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
123{
124 trace!(target: "engine::root::sparse", "Updating sparse trie");
125 let started_at = Instant::now();
126
127 trie.reveal_multiproof(multiproof)?;
129 let reveal_multiproof_elapsed = started_at.elapsed();
130 trace!(
131 target: "engine::root::sparse",
132 ?reveal_multiproof_elapsed,
133 "Done revealing multiproof"
134 );
135
136 let (tx, rx) = mpsc::channel();
138 state
139 .storages
140 .into_iter()
141 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
142 .par_bridge()
143 .map(|(address, storage, storage_trie)| {
144 let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
145 let _enter = span.enter();
146 trace!(target: "engine::root::sparse", "Updating storage");
147 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
148
149 if storage.wiped {
150 trace!(target: "engine::root::sparse", "Wiping storage");
151 storage_trie.wipe()?;
152 }
153 for (slot, value) in storage.storage {
154 let slot_nibbles = Nibbles::unpack(slot);
155 if value.is_zero() {
156 trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
157 storage_trie.remove_leaf(&slot_nibbles)?;
158 } else {
159 trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
160 storage_trie
161 .update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
162 }
163 }
164
165 storage_trie.root();
166
167 SparseStateTrieResult::Ok((address, storage_trie))
168 })
169 .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
170 drop(tx);
171
172 for result in rx {
174 let (address, storage_trie) = result?;
175 trie.insert_storage_trie(address, storage_trie);
176
177 if let Some(account) = state.accounts.remove(&address) {
178 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
181 trie.update_account(address, account.unwrap_or_default())?;
182 } else if trie.is_account_revealed(address) {
183 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
185 trie.update_account_storage_root(address)?;
186 }
187 }
188
189 for (address, account) in state.accounts {
191 trace!(target: "engine::root::sparse", ?address, "Updating account");
192 trie.update_account(address, account.unwrap_or_default())?;
193 }
194
195 let elapsed_before = started_at.elapsed();
196 trace!(
197 target: "engine::root:sparse",
198 level=SPARSE_TRIE_INCREMENTAL_LEVEL,
199 "Calculating intermediate nodes below trie level"
200 );
201 trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
202
203 let elapsed = started_at.elapsed();
204 let below_level_elapsed = elapsed - elapsed_before;
205 trace!(
206 target: "engine::root:sparse",
207 level=SPARSE_TRIE_INCREMENTAL_LEVEL,
208 ?below_level_elapsed,
209 "Intermediate nodes calculated"
210 );
211
212 Ok(elapsed)
213}