reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
4use alloy_primitives::B256;
5use rayon::iter::{ParallelBridge, ParallelIterator};
6use reth_trie::{updates::TrieUpdates, Nibbles};
7use reth_trie_parallel::root::ParallelStateRootError;
8use reth_trie_sparse::{
9    errors::{SparseStateTrieResult, SparseTrieErrorKind},
10    provider::{TrieNodeProvider, TrieNodeProviderFactory},
11    ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
12};
13use smallvec::SmallVec;
14use std::{
15    sync::mpsc,
16    time::{Duration, Instant},
17};
18use tracing::{debug, debug_span, instrument, trace};
19
20/// A task responsible for populating the sparse trie.
21pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
22where
23    BPF: TrieNodeProviderFactory + Send + Sync,
24    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
25    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
26{
27    /// Receives updates from the state root task.
28    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
29    /// `SparseStateTrie` used for computing the state root.
30    pub(super) trie: SparseStateTrie<A, S>,
31    pub(super) metrics: MultiProofTaskMetrics,
32    /// Trie node provider factory.
33    blinded_provider_factory: BPF,
34}
35
36impl<BPF, A, S> SparseTrieTask<BPF, A, S>
37where
38    BPF: TrieNodeProviderFactory + Send + Sync + Clone,
39    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
40    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
41    A: SparseTrieInterface + Send + Sync + Default,
42    S: SparseTrieInterface + Send + Sync + Default + Clone,
43{
44    /// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
45    pub(super) fn new_with_cleared_trie(
46        updates: mpsc::Receiver<SparseTrieUpdate>,
47        blinded_provider_factory: BPF,
48        metrics: MultiProofTaskMetrics,
49        sparse_state_trie: ClearedSparseStateTrie<A, S>,
50    ) -> Self {
51        Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
52    }
53
54    /// Runs the sparse trie task to completion.
55    ///
56    /// This waits for new incoming [`SparseTrieUpdate`].
57    ///
58    /// This concludes once the last trie update has been received.
59    ///
60    /// # Returns
61    ///
62    /// - State root computation outcome.
63    /// - `SparseStateTrie` that needs to be cleared and reused to avoid reallocations.
64    #[instrument(
65        level = "debug",
66        target = "engine::tree::payload_processor::sparse_trie",
67        skip_all
68    )]
69    pub(super) fn run(
70        mut self,
71    ) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
72        // run the main loop to completion
73        let result = self.run_inner();
74        (result, self.trie)
75    }
76
77    /// Inner function to run the sparse trie task to completion.
78    ///
79    /// See [`Self::run`] for more information.
80    fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
81        let now = Instant::now();
82
83        let mut num_iterations = 0;
84
85        while let Ok(mut update) = self.updates.recv() {
86            num_iterations += 1;
87            let mut num_updates = 1;
88            let _enter =
89                debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
90                    .entered();
91            while let Ok(next) = self.updates.try_recv() {
92                update.extend(next);
93                num_updates += 1;
94            }
95            drop(_enter);
96
97            debug!(
98                target: "engine::root",
99                num_updates,
100                account_proofs = update.multiproof.account_subtree.len(),
101                storage_proofs = update.multiproof.storages.len(),
102                "Updating sparse trie"
103            );
104
105            let elapsed =
106                update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
107                    .map_err(|e| {
108                        ParallelStateRootError::Other(format!(
109                            "could not calculate state root: {e:?}"
110                        ))
111                    })?;
112            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
113            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
114        }
115
116        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
117
118        let start = Instant::now();
119        let (state_root, trie_updates) =
120            self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
121                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
122            })?;
123
124        self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
125        self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
126
127        Ok(StateRootComputeOutcome { state_root, trie_updates })
128    }
129}
130
131/// Outcome of the state root computation, including the state root itself with
132/// the trie updates.
133#[derive(Debug)]
134pub struct StateRootComputeOutcome {
135    /// The state root.
136    pub state_root: B256,
137    /// The trie updates.
138    pub trie_updates: TrieUpdates,
139}
140
141/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
142#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
143pub(crate) fn update_sparse_trie<BPF, A, S>(
144    trie: &mut SparseStateTrie<A, S>,
145    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
146    blinded_provider_factory: &BPF,
147) -> SparseStateTrieResult<Duration>
148where
149    BPF: TrieNodeProviderFactory + Send + Sync,
150    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
151    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
152    A: SparseTrieInterface + Send + Sync + Default,
153    S: SparseTrieInterface + Send + Sync + Default + Clone,
154{
155    trace!(target: "engine::root::sparse", "Updating sparse trie");
156    let started_at = Instant::now();
157
158    // Reveal new accounts and storage slots.
159    trie.reveal_decoded_multiproof(multiproof)?;
160    let reveal_multiproof_elapsed = started_at.elapsed();
161    trace!(
162        target: "engine::root::sparse",
163        ?reveal_multiproof_elapsed,
164        "Done revealing multiproof"
165    );
166
167    // Update storage slots with new values and calculate storage roots.
168    let span = tracing::Span::current();
169    let results: Vec<_> = state
170        .storages
171        .into_iter()
172        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
173        .par_bridge()
174        .map(|(address, storage, storage_trie)| {
175            let _enter =
176                debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
177                    .entered();
178
179            trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
180            let storage_provider = blinded_provider_factory.storage_node_provider(address);
181            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
182
183            if storage.wiped {
184                trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
185                storage_trie.wipe()?;
186            }
187
188            // Defer leaf removals until after updates/additions, so that we don't delete an
189            // intermediate branch node during a removal and then re-add that branch back during a
190            // later leaf addition. This is an optimization, but also a requirement inherited from
191            // multiproof generating, which can't know the order that leaf operations happen in.
192            let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
193
194            for (slot, value) in storage.storage {
195                let slot_nibbles = Nibbles::unpack(slot);
196
197                if value.is_zero() {
198                    removed_slots.push(slot_nibbles);
199                    continue;
200                }
201
202                trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
203                storage_trie.update_leaf(
204                    slot_nibbles,
205                    alloy_rlp::encode_fixed_size(&value).to_vec(),
206                    &storage_provider,
207                )?;
208            }
209
210            for slot_nibbles in removed_slots {
211                trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
212                storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
213            }
214
215            storage_trie.root();
216
217            SparseStateTrieResult::Ok((address, storage_trie))
218        })
219        .collect();
220
221    // Defer leaf removals until after updates/additions, so that we don't delete an intermediate
222    // branch node during a removal and then re-add that branch back during a later leaf addition.
223    // This is an optimization, but also a requirement inherited from multiproof generating, which
224    // can't know the order that leaf operations happen in.
225    let mut removed_accounts = Vec::new();
226
227    // Update account storage roots
228    let _enter =
229        tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
230            .entered();
231    for result in results {
232        let (address, storage_trie) = result?;
233        trie.insert_storage_trie(address, storage_trie);
234
235        if let Some(account) = state.accounts.remove(&address) {
236            // If the account itself has an update, remove it from the state update and update in
237            // one go instead of doing it down below.
238            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
239            if !trie.update_account(
240                address,
241                account.unwrap_or_default(),
242                blinded_provider_factory,
243            )? {
244                removed_accounts.push(address);
245            }
246        } else if trie.is_account_revealed(address) {
247            // Otherwise, if the account is revealed, only update its storage root.
248            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
249            if !trie.update_account_storage_root(address, blinded_provider_factory)? {
250                removed_accounts.push(address);
251            }
252        }
253    }
254
255    // Update accounts
256    for (address, account) in state.accounts {
257        trace!(target: "engine::root::sparse", ?address, "Updating account");
258        if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
259            removed_accounts.push(address);
260        }
261    }
262
263    // Remove accounts
264    for address in removed_accounts {
265        trace!(target: "engine::root::sparse", ?address, "Removing account");
266        let nibbles = Nibbles::unpack(address);
267        trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
268    }
269
270    let elapsed_before = started_at.elapsed();
271    trace!(
272        target: "engine::root::sparse",
273        "Calculating subtries"
274    );
275    trie.calculate_subtries();
276
277    let elapsed = started_at.elapsed();
278    let below_level_elapsed = elapsed - elapsed_before;
279    trace!(
280        target: "engine::root::sparse",
281        ?below_level_elapsed,
282        "Intermediate nodes calculated"
283    );
284
285    Ok(elapsed)
286}