reth_engine_tree/tree/payload_processor/
sparse_trie.rs
1use crate::tree::payload_processor::{
4 executor::WorkloadExecutor,
5 multiproof::{MultiProofConfig, MultiProofTaskMetrics, SparseTrieUpdate},
6};
7use alloy_primitives::B256;
8use rayon::iter::{ParallelBridge, ParallelIterator};
9use reth_provider::{BlockReader, DBProvider, DatabaseProviderFactory, StateCommitmentProvider};
10use reth_trie::{
11 hashed_cursor::HashedPostStateCursorFactory, proof::ProofBlindedProviderFactory,
12 trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, Nibbles,
13};
14use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
15use reth_trie_parallel::root::ParallelStateRootError;
16use reth_trie_sparse::{
17 blinded::{BlindedProvider, BlindedProviderFactory},
18 errors::{SparseStateTrieResult, SparseTrieErrorKind},
19 SparseStateTrie,
20};
21use std::{
22 sync::mpsc,
23 time::{Duration, Instant},
24};
25use tracing::{debug, trace, trace_span};
26
27const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
30
31pub(super) struct SparseTrieTask<F> {
33 #[allow(unused)] pub(super) executor: WorkloadExecutor,
36 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
38 pub(super) config: MultiProofConfig<F>,
40 pub(super) metrics: MultiProofTaskMetrics,
41}
42
43impl<F> SparseTrieTask<F>
44where
45 F: DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider,
46{
47 pub(super) fn run(self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
53 let now = Instant::now();
54 let provider_ro = self.config.consistent_view.provider_ro()?;
55 let in_memory_trie_cursor = InMemoryTrieCursorFactory::new(
56 DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
57 &self.config.nodes_sorted,
58 );
59 let blinded_provider_factory = ProofBlindedProviderFactory::new(
60 in_memory_trie_cursor.clone(),
61 HashedPostStateCursorFactory::new(
62 DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
63 &self.config.state_sorted,
64 ),
65 self.config.prefix_sets.clone(),
66 );
67
68 let mut num_iterations = 0;
69 let mut trie = SparseStateTrie::new(blinded_provider_factory).with_updates(true);
70
71 while let Ok(mut update) = self.updates.recv() {
72 num_iterations += 1;
73 let mut num_updates = 1;
74 while let Ok(next) = self.updates.try_recv() {
75 update.extend(next);
76 num_updates += 1;
77 }
78
79 debug!(
80 target: "engine::root",
81 num_updates,
82 account_proofs = update.multiproof.account_subtree.len(),
83 storage_proofs = update.multiproof.storages.len(),
84 "Updating sparse trie"
85 );
86
87 let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| {
88 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
89 })?;
90 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
91 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
92 }
93
94 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
95
96 let start = Instant::now();
97 let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| {
98 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
99 })?;
100
101 self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
102 self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
103
104 Ok(StateRootComputeOutcome { state_root, trie_updates })
105 }
106}
107
108#[derive(Debug)]
111pub struct StateRootComputeOutcome {
112 pub state_root: B256,
114 pub trie_updates: TrieUpdates,
116}
117
118pub(crate) fn update_sparse_trie<BPF>(
120 trie: &mut SparseStateTrie<BPF>,
121 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
122) -> SparseStateTrieResult<Duration>
123where
124 BPF: BlindedProviderFactory + Send + Sync,
125 BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
126 BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
127{
128 trace!(target: "engine::root::sparse", "Updating sparse trie");
129 let started_at = Instant::now();
130
131 trie.reveal_multiproof(multiproof)?;
133 let reveal_multiproof_elapsed = started_at.elapsed();
134 trace!(
135 target: "engine::root::sparse",
136 ?reveal_multiproof_elapsed,
137 "Done revealing multiproof"
138 );
139
140 let (tx, rx) = mpsc::channel();
142 state
143 .storages
144 .into_iter()
145 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
146 .par_bridge()
147 .map(|(address, storage, storage_trie)| {
148 let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
149 let _enter = span.enter();
150 trace!(target: "engine::root::sparse", "Updating storage");
151 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
152
153 if storage.wiped {
154 trace!(target: "engine::root::sparse", "Wiping storage");
155 storage_trie.wipe()?;
156 }
157 for (slot, value) in storage.storage {
158 let slot_nibbles = Nibbles::unpack(slot);
159 if value.is_zero() {
160 trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
161 storage_trie.remove_leaf(&slot_nibbles)?;
162 } else {
163 trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
164 storage_trie
165 .update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
166 }
167 }
168
169 storage_trie.root();
170
171 SparseStateTrieResult::Ok((address, storage_trie))
172 })
173 .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
174 drop(tx);
175
176 for result in rx {
178 let (address, storage_trie) = result?;
179 trie.insert_storage_trie(address, storage_trie);
180
181 if let Some(account) = state.accounts.remove(&address) {
182 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
185 trie.update_account(address, account.unwrap_or_default())?;
186 } else if trie.is_account_revealed(address) {
187 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
189 trie.update_account_storage_root(address)?;
190 }
191 }
192
193 for (address, account) in state.accounts {
195 trace!(target: "engine::root::sparse", ?address, "Updating account");
196 trie.update_account(address, account.unwrap_or_default())?;
197 }
198
199 let elapsed_before = started_at.elapsed();
200 trace!(
201 target: "engine::root:sparse",
202 level=SPARSE_TRIE_INCREMENTAL_LEVEL,
203 "Calculating intermediate nodes below trie level"
204 );
205 trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
206
207 let elapsed = started_at.elapsed();
208 let below_level_elapsed = elapsed - elapsed_before;
209 trace!(
210 target: "engine::root:sparse",
211 level=SPARSE_TRIE_INCREMENTAL_LEVEL,
212 ?below_level_elapsed,
213 "Intermediate nodes calculated"
214 );
215
216 Ok(elapsed)
217}