reth_engine_tree/tree/payload_processor/
sparse_trie.rs1use crate::tree::payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate};
4use alloy_primitives::B256;
5use rayon::iter::{ParallelBridge, ParallelIterator};
6use reth_trie::{updates::TrieUpdates, Nibbles};
7use reth_trie_parallel::root::ParallelStateRootError;
8use reth_trie_sparse::{
9    errors::{SparseStateTrieResult, SparseTrieErrorKind},
10    provider::{TrieNodeProvider, TrieNodeProviderFactory},
11    ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrieInterface,
12};
13use smallvec::SmallVec;
14use std::{
15    sync::mpsc,
16    time::{Duration, Instant},
17};
18use tracing::{debug, debug_span, instrument, trace};
19
20pub(super) struct SparseTrieTask<BPF, A = SerialSparseTrie, S = SerialSparseTrie>
22where
23    BPF: TrieNodeProviderFactory + Send + Sync,
24    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
25    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
26{
27    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
29    pub(super) trie: SparseStateTrie<A, S>,
31    pub(super) metrics: MultiProofTaskMetrics,
32    blinded_provider_factory: BPF,
34}
35
36impl<BPF, A, S> SparseTrieTask<BPF, A, S>
37where
38    BPF: TrieNodeProviderFactory + Send + Sync + Clone,
39    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
40    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
41    A: SparseTrieInterface + Send + Sync + Default,
42    S: SparseTrieInterface + Send + Sync + Default + Clone,
43{
44    pub(super) fn new_with_cleared_trie(
46        updates: mpsc::Receiver<SparseTrieUpdate>,
47        blinded_provider_factory: BPF,
48        metrics: MultiProofTaskMetrics,
49        sparse_state_trie: ClearedSparseStateTrie<A, S>,
50    ) -> Self {
51        Self { updates, metrics, trie: sparse_state_trie.into_inner(), blinded_provider_factory }
52    }
53
54    #[instrument(
65        level = "debug",
66        target = "engine::tree::payload_processor::sparse_trie",
67        skip_all
68    )]
69    pub(super) fn run(
70        mut self,
71    ) -> (Result<StateRootComputeOutcome, ParallelStateRootError>, SparseStateTrie<A, S>) {
72        let result = self.run_inner();
74        (result, self.trie)
75    }
76
77    fn run_inner(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
81        let now = Instant::now();
82
83        let mut num_iterations = 0;
84
85        while let Ok(mut update) = self.updates.recv() {
86            num_iterations += 1;
87            let mut num_updates = 1;
88            let _enter =
89                debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
90                    .entered();
91            while let Ok(next) = self.updates.try_recv() {
92                update.extend(next);
93                num_updates += 1;
94            }
95            drop(_enter);
96
97            debug!(
98                target: "engine::root",
99                num_updates,
100                account_proofs = update.multiproof.account_subtree.len(),
101                storage_proofs = update.multiproof.storages.len(),
102                "Updating sparse trie"
103            );
104
105            let elapsed =
106                update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
107                    .map_err(|e| {
108                        ParallelStateRootError::Other(format!(
109                            "could not calculate state root: {e:?}"
110                        ))
111                    })?;
112            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
113            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
114        }
115
116        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
117
118        let start = Instant::now();
119        let (state_root, trie_updates) =
120            self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
121                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
122            })?;
123
124        self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
125        self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
126
127        Ok(StateRootComputeOutcome { state_root, trie_updates })
128    }
129}
130
131#[derive(Debug)]
134pub struct StateRootComputeOutcome {
135    pub state_root: B256,
137    pub trie_updates: TrieUpdates,
139}
140
141#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
143pub(crate) fn update_sparse_trie<BPF, A, S>(
144    trie: &mut SparseStateTrie<A, S>,
145    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
146    blinded_provider_factory: &BPF,
147) -> SparseStateTrieResult<Duration>
148where
149    BPF: TrieNodeProviderFactory + Send + Sync,
150    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
151    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
152    A: SparseTrieInterface + Send + Sync + Default,
153    S: SparseTrieInterface + Send + Sync + Default + Clone,
154{
155    trace!(target: "engine::root::sparse", "Updating sparse trie");
156    let started_at = Instant::now();
157
158    trie.reveal_decoded_multiproof(multiproof)?;
160    let reveal_multiproof_elapsed = started_at.elapsed();
161    trace!(
162        target: "engine::root::sparse",
163        ?reveal_multiproof_elapsed,
164        "Done revealing multiproof"
165    );
166
167    let span = tracing::Span::current();
169    let (tx, rx) = mpsc::channel();
170    state
171        .storages
172        .into_iter()
173        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
174        .par_bridge()
175        .map(|(address, storage, storage_trie)| {
176            let _enter =
177                debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: span.clone(), "storage trie", ?address)
178                    .entered();
179
180            trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
181            let storage_provider = blinded_provider_factory.storage_node_provider(address);
182            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
183
184            if storage.wiped {
185                trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
186                storage_trie.wipe()?;
187            }
188
189            let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
194
195            for (slot, value) in storage.storage {
196                let slot_nibbles = Nibbles::unpack(slot);
197
198                if value.is_zero() {
199                    removed_slots.push(slot_nibbles);
200                    continue;
201                }
202
203                trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
204                storage_trie.update_leaf(
205                    slot_nibbles,
206                    alloy_rlp::encode_fixed_size(&value).to_vec(),
207                    &storage_provider,
208                )?;
209            }
210
211            for slot_nibbles in removed_slots {
212                trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
213                storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
214            }
215
216            storage_trie.root();
217
218            SparseStateTrieResult::Ok((address, storage_trie))
219        })
220        .for_each_init(
221            || tx.clone(),
222            |tx, result| {
223                let _ = tx.send(result);
224            },
225        );
226    drop(tx);
227
228    let mut removed_accounts = Vec::new();
233
234    let _enter =
236        tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
237            .entered();
238    for result in rx {
239        let (address, storage_trie) = result?;
240        trie.insert_storage_trie(address, storage_trie);
241
242        if let Some(account) = state.accounts.remove(&address) {
243            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
246            if !trie.update_account(
247                address,
248                account.unwrap_or_default(),
249                blinded_provider_factory,
250            )? {
251                removed_accounts.push(address);
252            }
253        } else if trie.is_account_revealed(address) {
254            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
256            if !trie.update_account_storage_root(address, blinded_provider_factory)? {
257                removed_accounts.push(address);
258            }
259        }
260    }
261
262    for (address, account) in state.accounts {
264        trace!(target: "engine::root::sparse", ?address, "Updating account");
265        if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
266            removed_accounts.push(address);
267        }
268    }
269
270    for address in removed_accounts {
272        trace!(target: "engine::root::sparse", ?address, "Removing account");
273        let nibbles = Nibbles::unpack(address);
274        trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
275    }
276
277    let elapsed_before = started_at.elapsed();
278    trace!(
279        target: "engine::root::sparse",
280        "Calculating subtries"
281    );
282    trie.calculate_subtries();
283
284    let elapsed = started_at.elapsed();
285    let below_level_elapsed = elapsed - elapsed_before;
286    trace!(
287        target: "engine::root::sparse",
288        ?below_level_elapsed,
289        "Intermediate nodes calculated"
290    );
291
292    Ok(elapsed)
293}