reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
4use alloy_primitives::B256;
5use rayon::iter::{ParallelBridge, ParallelIterator};
6use reth_trie::{updates::TrieUpdates, Nibbles};
7use reth_trie_parallel::root::ParallelStateRootError;
8use reth_trie_sparse::{
9    errors::{SparseStateTrieResult, SparseTrieErrorKind},
10    provider::{TrieNodeProvider, TrieNodeProviderFactory},
11    ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
12};
13use smallvec::SmallVec;
14use std::{
15    sync::mpsc,
16    time::{Duration, Instant},
17};
18use tracing::{debug, trace, trace_span};
19
20/// A task responsible for populating the sparse trie.
21pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
22where
23    BPF: TrieNodeProviderFactory + Send + Sync,
24    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
25    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
26{
27    /// Receives updates from the state root task.
28    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
29    /// `SparseStateTrie` used for computing the state root.
30    pub(super) trie: SparseStateTrie<A, S>,
31    pub(super) metrics: MultiProofTaskMetrics,
32    /// Trie node provider factory.
33    blinded_provider_factory: BPF,
34}
35
36impl<BPF, A, S> SparseTrieTask<BPF, A, S>
37where
38    BPF: TrieNodeProviderFactory + Send + Sync + Clone,
39    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
40    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
41    A: SparseTrieInterface + Send + Sync + Default,
42    S: SparseTrieInterface + Send + Sync + Default,
43{
44    /// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
45    pub(super) fn new_with_cleared_trie(
46        updates: mpsc::Receiver<SparseTrieUpdate>,
47        blinded_provider_factory: BPF,
48        metrics: MultiProofTaskMetrics,
49        sparse_state_trie: ClearedSparseStateTrie<A, S>,
50    ) -> Self {
51        Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
52    }
53
54    /// Runs the sparse trie task to completion.
55    ///
56    /// This waits for new incoming [`SparseTrieUpdate`].
57    ///
58    /// This concludes once the last trie update has been received.
59    ///
60    /// # Returns
61    ///
62    /// - State root computation outcome.
63    /// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
64    pub(super) fn run(
65        mut self,
66    ) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
67        // run the main loop to completion
68        let result = self.run_inner();
69        (result, self.trie)
70    }
71
72    /// Inner function to run the sparse trie task to completion.
73    ///
74    /// See [`Self::run`] for more information.
75    fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
76        let now = Instant::now();
77
78        let mut num_iterations = 0;
79
80        while let Ok(mut update) = self.updates.recv() {
81            num_iterations += 1;
82            let mut num_updates = 1;
83            while let Ok(next) = self.updates.try_recv() {
84                update.extend(next);
85                num_updates += 1;
86            }
87
88            debug!(
89                target: "engine::root",
90                num_updates,
91                account_proofs = update.multiproof.account_subtree.len(),
92                storage_proofs = update.multiproof.storages.len(),
93                "Updating sparse trie"
94            );
95
96            let elapsed =
97                update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
98                    .map_err(|e| {
99                        ParallelStateRootError::Other(format!(
100                            "could not calculate state root: {e:?}"
101                        ))
102                    })?;
103            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
104            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
105        }
106
107        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
108
109        let start = Instant::now();
110        let (state_root, trie_updates) =
111            self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
112                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
113            })?;
114
115        self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
116        self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
117
118        Ok(StateRootComputeOutcome { state_root, trie_updates })
119    }
120}
121
122/// Outcome of the state root computation, including the state root itself with
123/// the trie updates.
124#[derive(Debug)]
125pub struct StateRootComputeOutcome {
126    /// The state root.
127    pub state_root: B256,
128    /// The trie updates.
129    pub trie_updates: TrieUpdates,
130}
131
132/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
133pub(crate) fn update_sparse_trie<BPF, A, S>(
134    trie: &mut SparseStateTrie<A, S>,
135    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
136    blinded_provider_factory: &BPF,
137) -> SparseStateTrieResult<Duration>
138where
139    BPF: TrieNodeProviderFactory + Send + Sync,
140    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
141    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
142    A: SparseTrieInterface + Send + Sync + Default,
143    S: SparseTrieInterface + Send + Sync + Default,
144{
145    trace!(target: "engine::root::sparse", "Updating sparse trie");
146    let started_at = Instant::now();
147
148    // Reveal new accounts and storage slots.
149    trie.reveal_decoded_multiproof(multiproof)?;
150    let reveal_multiproof_elapsed = started_at.elapsed();
151    trace!(
152        target: "engine::root::sparse",
153        ?reveal_multiproof_elapsed,
154        "Done revealing multiproof"
155    );
156
157    // Update storage slots with new values and calculate storage roots.
158    let (tx, rx) = mpsc::channel();
159    state
160        .storages
161        .into_iter()
162        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
163        .par_bridge()
164        .map(|(address, storage, storage_trie)| {
165            let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
166            let _enter = span.enter();
167            trace!(target: "engine::root::sparse", "Updating storage");
168            let storage_provider = blinded_provider_factory.storage_node_provider(address);
169            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
170
171            if storage.wiped {
172                trace!(target: "engine::root::sparse", "Wiping storage");
173                storage_trie.wipe()?;
174            }
175
176            // Defer leaf removals until after updates/additions, so that we don't delete an
177            // intermediate branch node during a removal and then re-add that branch back during a
178            // later leaf addition. This is an optimization, but also a requirement inherited from
179            // multiproof generating, which can't know the order that leaf operations happen in.
180            let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
181
182            for (slot, value) in storage.storage {
183                let slot_nibbles = Nibbles::unpack(slot);
184
185                if value.is_zero() {
186                    removed_slots.push(slot_nibbles);
187                    continue;
188                }
189
190                trace!(target: "engine::root::sparse", ?slot_nibbles, "Updating storage slot");
191                storage_trie.update_leaf(
192                    slot_nibbles,
193                    alloy_rlp::encode_fixed_size(&value).to_vec(),
194                    &storage_provider,
195                )?;
196            }
197
198            for slot_nibbles in removed_slots {
199                trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
200                storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
201            }
202
203            storage_trie.root();
204
205            SparseStateTrieResult::Ok((address, storage_trie))
206        })
207        .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
208    drop(tx);
209
210    // Defer leaf removals until after updates/additions, so that we don't delete an intermediate
211    // branch node during a removal and then re-add that branch back during a later leaf addition.
212    // This is an optimization, but also a requirement inherited from multiproof generating, which
213    // can't know the order that leaf operations happen in.
214    let mut removed_accounts = Vec::new();
215
216    // Update account storage roots
217    for result in rx {
218        let (address, storage_trie) = result?;
219        trie.insert_storage_trie(address, storage_trie);
220
221        if let Some(account) = state.accounts.remove(&address) {
222            // If the account itself has an update, remove it from the state update and update in
223            // one go instead of doing it down below.
224            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
225            if !trie.update_account(
226                address,
227                account.unwrap_or_default(),
228                blinded_provider_factory,
229            )? {
230                removed_accounts.push(address);
231            }
232        } else if trie.is_account_revealed(address) {
233            // Otherwise, if the account is revealed, only update its storage root.
234            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
235            if !trie.update_account_storage_root(address, blinded_provider_factory)? {
236                removed_accounts.push(address);
237            }
238        }
239    }
240
241    // Update accounts
242    for (address, account) in state.accounts {
243        trace!(target: "engine::root::sparse", ?address, "Updating account");
244        if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
245            removed_accounts.push(address);
246        }
247    }
248
249    // Remove accounts
250    for address in removed_accounts {
251        trace!(target: "trie::sparse", ?address, "Removing account");
252        let nibbles = Nibbles::unpack(address);
253        trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
254    }
255
256    let elapsed_before = started_at.elapsed();
257    trace!(
258        target: "engine::root::sparse",
259        "Calculating subtries"
260    );
261    trie.calculate_subtries();
262
263    let elapsed = started_at.elapsed();
264    let below_level_elapsed = elapsed - elapsed_before;
265    trace!(
266        target: "engine::root::sparse",
267        ?below_level_elapsed,
268        "Intermediate nodes calculated"
269    );
270
271    Ok(elapsed)
272}