reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use crate::tree::payload_processor::{
4    executor::WorkloadExecutor,
5    multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
6};
7use alloy_primitives::B256;
8use rayon::iter::{ParallelBridge, ParallelIterator};
9use reth_trie::{updates::TrieUpdates, Nibbles};
10use reth_trie_parallel::root::ParallelStateRootError;
11use reth_trie_sparse::{
12    blinded::{BlindedProvider, BlindedProviderFactory},
13    errors::{SparseStateTrieResult, SparseTrieErrorKind},
14    SparseStateTrie,
15};
16use std::{
17    sync::mpsc,
18    time::{Duration, Instant},
19};
20use tracing::{debug, trace, trace_span};
21
22/// The level below which the sparse trie hashes are calculated in
23/// [`update_sparse_trie`].
24const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
25
26/// A task responsible for populating the sparse trie.
27pub(super) struct SparseTrieTask<BPF> {
28    /// Executor used to spawn subtasks.
29    #[expect(unused)] // TODO use this for spawning trie tasks
30    pub(super) executor: WorkloadExecutor,
31    /// Receives updates from the state root task.
32    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
33    pub(super) blinded_provider_factory: BPF,
34    pub(super) metrics: MultiProofTaskMetrics,
35}
36
37impl<BPF> SparseTrieTask<BPF>
38where
39    BPF: BlindedProviderFactory + Send + Sync,
40    BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
41    BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
42{
43    /// Creates a new sparse trie task.
44    pub(super) const fn new(
45        executor: WorkloadExecutor,
46        updates: mpsc::Receiver<SparseTrieUpdate>,
47        blinded_provider_factory: BPF,
48        metrics: MultiProofTaskMetrics,
49    ) -> Self {
50        Self { executor, updates, blinded_provider_factory, metrics }
51    }
52
53    /// Runs the sparse trie task to completion.
54    ///
55    /// This waits for new incoming [`SparseTrieUpdate`].
56    ///
57    /// This concludes once the last trie update has been received.
58    ///
59    /// NOTE: This function does not take `self` by value to prevent blocking on [`SparseStateTrie`]
60    /// drop.
61    pub(super) fn run(&self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
62        let now = Instant::now();
63
64        let mut num_iterations = 0;
65        let mut trie = SparseStateTrie::new(&self.blinded_provider_factory).with_updates(true);
66
67        while let Ok(mut update) = self.updates.recv() {
68            num_iterations += 1;
69            let mut num_updates = 1;
70            while let Ok(next) = self.updates.try_recv() {
71                update.extend(next);
72                num_updates += 1;
73            }
74
75            debug!(
76                target: "engine::root",
77                num_updates,
78                account_proofs = update.multiproof.account_subtree.len(),
79                storage_proofs = update.multiproof.storages.len(),
80                "Updating sparse trie"
81            );
82
83            let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| {
84                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
85            })?;
86            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
87            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
88        }
89
90        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
91
92        let start = Instant::now();
93        let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| {
94            ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
95        })?;
96
97        self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
98        self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
99
100        Ok(StateRootComputeOutcome { state_root, trie_updates })
101    }
102}
103
104/// Outcome of the state root computation, including the state root itself with
105/// the trie updates.
106#[derive(Debug)]
107pub struct StateRootComputeOutcome {
108    /// The state root.
109    pub state_root: B256,
110    /// The trie updates.
111    pub trie_updates: TrieUpdates,
112}
113
114/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
115pub(crate) fn update_sparse_trie<BPF>(
116    trie: &mut SparseStateTrie<BPF>,
117    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
118) -> SparseStateTrieResult<Duration>
119where
120    BPF: BlindedProviderFactory + Send + Sync,
121    BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
122    BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
123{
124    trace!(target: "engine::root::sparse", "Updating sparse trie");
125    let started_at = Instant::now();
126
127    // Reveal new accounts and storage slots.
128    trie.reveal_multiproof(multiproof)?;
129    let reveal_multiproof_elapsed = started_at.elapsed();
130    trace!(
131        target: "engine::root::sparse",
132        ?reveal_multiproof_elapsed,
133        "Done revealing multiproof"
134    );
135
136    // Update storage slots with new values and calculate storage roots.
137    let (tx, rx) = mpsc::channel();
138    state
139        .storages
140        .into_iter()
141        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
142        .par_bridge()
143        .map(|(address, storage, storage_trie)| {
144            let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
145            let _enter = span.enter();
146            trace!(target: "engine::root::sparse", "Updating storage");
147            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
148
149            if storage.wiped {
150                trace!(target: "engine::root::sparse", "Wiping storage");
151                storage_trie.wipe()?;
152            }
153            for (slot, value) in storage.storage {
154                let slot_nibbles = Nibbles::unpack(slot);
155                if value.is_zero() {
156                    trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
157                    storage_trie.remove_leaf(&slot_nibbles)?;
158                } else {
159                    trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
160                    storage_trie
161                        .update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
162                }
163            }
164
165            storage_trie.root();
166
167            SparseStateTrieResult::Ok((address, storage_trie))
168        })
169        .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
170    drop(tx);
171
172    // Update account storage roots
173    for result in rx {
174        let (address, storage_trie) = result?;
175        trie.insert_storage_trie(address, storage_trie);
176
177        if let Some(account) = state.accounts.remove(&address) {
178            // If the account itself has an update, remove it from the state update and update in
179            // one go instead of doing it down below.
180            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
181            trie.update_account(address, account.unwrap_or_default())?;
182        } else if trie.is_account_revealed(address) {
183            // Otherwise, if the account is revealed, only update its storage root.
184            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
185            trie.update_account_storage_root(address)?;
186        }
187    }
188
189    // Update accounts
190    for (address, account) in state.accounts {
191        trace!(target: "engine::root::sparse", ?address, "Updating account");
192        trie.update_account(address, account.unwrap_or_default())?;
193    }
194
195    let elapsed_before = started_at.elapsed();
196    trace!(
197        target: "engine::root:sparse",
198        level=SPARSE_TRIE_INCREMENTAL_LEVEL,
199        "Calculating intermediate nodes below trie level"
200    );
201    trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
202
203    let elapsed = started_at.elapsed();
204    let below_level_elapsed = elapsed - elapsed_before;
205    trace!(
206        target: "engine::root:sparse",
207        level=SPARSE_TRIE_INCREMENTAL_LEVEL,
208        ?below_level_elapsed,
209        "Intermediate nodes calculated"
210    );
211
212    Ok(elapsed)
213}