reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use crate::tree::payload_processor::{
4    executor::WorkloadExecutor,
5    multiproof::{MultiProofConfig, MultiProofTaskMetrics, SparseTrieUpdate},
6};
7use alloy_primitives::B256;
8use rayon::iter::{ParallelBridge, ParallelIterator};
9use reth_provider::{BlockReader, DBProvider, DatabaseProviderFactory, StateCommitmentProvider};
10use reth_trie::{
11    hashed_cursor::HashedPostStateCursorFactory, proof::ProofBlindedProviderFactory,
12    trie_cursor::InMemoryTrieCursorFactory, updates::TrieUpdates, Nibbles,
13};
14use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
15use reth_trie_parallel::root::ParallelStateRootError;
16use reth_trie_sparse::{
17    blinded::{BlindedProvider, BlindedProviderFactory},
18    errors::{SparseStateTrieResult, SparseTrieErrorKind},
19    SparseStateTrie,
20};
21use std::{
22    sync::mpsc,
23    time::{Duration, Instant},
24};
25use tracing::{debug, trace, trace_span};
26
27/// The level below which the sparse trie hashes are calculated in
28/// [`update_sparse_trie`].
29const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
30
31/// A task responsible for populating the sparse trie.
32pub(super) struct SparseTrieTask<F> {
33    /// Executor used to spawn subtasks.
34    #[allow(unused)] // TODO use this for spawning trie tasks
35    pub(super) executor: WorkloadExecutor,
36    /// Receives updates from the state root task.
37    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
38    // TODO: ideally we need a way to create multiple readers on demand.
39    pub(super) config: MultiProofConfig<F>,
40    pub(super) metrics: MultiProofTaskMetrics,
41}
42
43impl<F> SparseTrieTask<F>
44where
45    F: DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider,
46{
47    /// Runs the sparse trie task to completion.
48    ///
49    /// This waits for new incoming [`SparseTrieUpdate`].
50    ///
51    /// This concludes once the last trie update has been received.
52    pub(super) fn run(self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
53        let now = Instant::now();
54        let provider_ro = self.config.consistent_view.provider_ro()?;
55        let in_memory_trie_cursor = InMemoryTrieCursorFactory::new(
56            DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
57            &self.config.nodes_sorted,
58        );
59        let blinded_provider_factory = ProofBlindedProviderFactory::new(
60            in_memory_trie_cursor.clone(),
61            HashedPostStateCursorFactory::new(
62                DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
63                &self.config.state_sorted,
64            ),
65            self.config.prefix_sets.clone(),
66        );
67
68        let mut num_iterations = 0;
69        let mut trie = SparseStateTrie::new(blinded_provider_factory).with_updates(true);
70
71        while let Ok(mut update) = self.updates.recv() {
72            num_iterations += 1;
73            let mut num_updates = 1;
74            while let Ok(next) = self.updates.try_recv() {
75                update.extend(next);
76                num_updates += 1;
77            }
78
79            debug!(
80                target: "engine::root",
81                num_updates,
82                account_proofs = update.multiproof.account_subtree.len(),
83                storage_proofs = update.multiproof.storages.len(),
84                "Updating sparse trie"
85            );
86
87            let elapsed = update_sparse_trie(&mut trie, update).map_err(|e| {
88                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
89            })?;
90            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
91            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
92        }
93
94        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
95
96        let start = Instant::now();
97        let (state_root, trie_updates) = trie.root_with_updates().map_err(|e| {
98            ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
99        })?;
100
101        self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
102        self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
103
104        Ok(StateRootComputeOutcome { state_root, trie_updates })
105    }
106}
107
108/// Outcome of the state root computation, including the state root itself with
109/// the trie updates.
110#[derive(Debug)]
111pub struct StateRootComputeOutcome {
112    /// The state root.
113    pub state_root: B256,
114    /// The trie updates.
115    pub trie_updates: TrieUpdates,
116}
117
118/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
119pub(crate) fn update_sparse_trie<BPF>(
120    trie: &mut SparseStateTrie<BPF>,
121    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
122) -> SparseStateTrieResult<Duration>
123where
124    BPF: BlindedProviderFactory + Send + Sync,
125    BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
126    BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
127{
128    trace!(target: "engine::root::sparse", "Updating sparse trie");
129    let started_at = Instant::now();
130
131    // Reveal new accounts and storage slots.
132    trie.reveal_multiproof(multiproof)?;
133    let reveal_multiproof_elapsed = started_at.elapsed();
134    trace!(
135        target: "engine::root::sparse",
136        ?reveal_multiproof_elapsed,
137        "Done revealing multiproof"
138    );
139
140    // Update storage slots with new values and calculate storage roots.
141    let (tx, rx) = mpsc::channel();
142    state
143        .storages
144        .into_iter()
145        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
146        .par_bridge()
147        .map(|(address, storage, storage_trie)| {
148            let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
149            let _enter = span.enter();
150            trace!(target: "engine::root::sparse", "Updating storage");
151            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
152
153            if storage.wiped {
154                trace!(target: "engine::root::sparse", "Wiping storage");
155                storage_trie.wipe()?;
156            }
157            for (slot, value) in storage.storage {
158                let slot_nibbles = Nibbles::unpack(slot);
159                if value.is_zero() {
160                    trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
161                    storage_trie.remove_leaf(&slot_nibbles)?;
162                } else {
163                    trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
164                    storage_trie
165                        .update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
166                }
167            }
168
169            storage_trie.root();
170
171            SparseStateTrieResult::Ok((address, storage_trie))
172        })
173        .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
174    drop(tx);
175
176    // Update account storage roots
177    for result in rx {
178        let (address, storage_trie) = result?;
179        trie.insert_storage_trie(address, storage_trie);
180
181        if let Some(account) = state.accounts.remove(&address) {
182            // If the account itself has an update, remove it from the state update and update in
183            // one go instead of doing it down below.
184            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
185            trie.update_account(address, account.unwrap_or_default())?;
186        } else if trie.is_account_revealed(address) {
187            // Otherwise, if the account is revealed, only update its storage root.
188            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
189            trie.update_account_storage_root(address)?;
190        }
191    }
192
193    // Update accounts
194    for (address, account) in state.accounts {
195        trace!(target: "engine::root::sparse", ?address, "Updating account");
196        trie.update_account(address, account.unwrap_or_default())?;
197    }
198
199    let elapsed_before = started_at.elapsed();
200    trace!(
201        target: "engine::root:sparse",
202        level=SPARSE_TRIE_INCREMENTAL_LEVEL,
203        "Calculating intermediate nodes below trie level"
204    );
205    trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
206
207    let elapsed = started_at.elapsed();
208    let below_level_elapsed = elapsed - elapsed_before;
209    trace!(
210        target: "engine::root:sparse",
211        level=SPARSE_TRIE_INCREMENTAL_LEVEL,
212        ?below_level_elapsed,
213        "Intermediate nodes calculated"
214    );
215
216    Ok(elapsed)
217}