Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use crate::tree::{
4    multiproof::{
5        dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
6        VersionedMultiProofTargets, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
7    },
8    payload_processor::multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
9};
10use alloy_primitives::B256;
11use alloy_rlp::{Decodable, Encodable};
12use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
13use rayon::iter::ParallelIterator;
14use reth_primitives_traits::{Account, ParallelBridgeBuffered};
15use reth_tasks::Runtime;
16use reth_trie::{
17    proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
18    TrieAccount, EMPTY_ROOT_HASH, TRIE_ACCOUNT_RLP_MAX_SIZE,
19};
20use reth_trie_parallel::{
21    proof_task::{
22        AccountMultiproofInput, ProofResult, ProofResultContext, ProofResultMessage,
23        ProofWorkerHandle,
24    },
25    root::ParallelStateRootError,
26    targets_v2::MultiProofTargetsV2,
27};
28use reth_trie_sparse::{
29    errors::{SparseStateTrieResult, SparseTrieErrorKind, SparseTrieResult},
30    provider::{TrieNodeProvider, TrieNodeProviderFactory},
31    DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie, SparseTrie,
32};
33use revm_primitives::{hash_map::Entry, B256Map};
34use smallvec::SmallVec;
35use std::{
36    sync::mpsc,
37    time::{Duration, Instant},
38};
39use tracing::{debug, debug_span, error, instrument, trace};
40
41#[expect(clippy::large_enum_variant)]
42pub(super) enum SpawnedSparseTrieTask<BPF, A, S>
43where
44    BPF: TrieNodeProviderFactory + Send + Sync,
45    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
46    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
47    A: SparseTrie + Send + Sync + Default,
48    S: SparseTrie + Send + Sync + Default + Clone,
49{
50    Cleared(SparseTrieTask<BPF, A, S>),
51    Cached(SparseTrieCacheTask<A, S>),
52}
53
54impl<BPF, A, S> SpawnedSparseTrieTask<BPF, A, S>
55where
56    BPF: TrieNodeProviderFactory + Send + Sync + Clone,
57    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
58    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
59    A: SparseTrie + Send + Sync + Default,
60    S: SparseTrie + Send + Sync + Default + Clone,
61{
62    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
63        match self {
64            Self::Cleared(task) => task.run(),
65            Self::Cached(task) => task.run(),
66        }
67    }
68
69    pub(super) fn into_trie_for_reuse(
70        self,
71        prune_depth: usize,
72        max_storage_tries: usize,
73        max_nodes_capacity: usize,
74        max_values_capacity: usize,
75        disable_pruning: bool,
76    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
77        match self {
78            Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
79            Self::Cached(task) => task.into_trie_for_reuse(
80                prune_depth,
81                max_storage_tries,
82                max_nodes_capacity,
83                max_values_capacity,
84                disable_pruning,
85            ),
86        }
87    }
88
89    pub(super) fn into_cleared_trie(
90        self,
91        max_nodes_capacity: usize,
92        max_values_capacity: usize,
93    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
94        match self {
95            Self::Cleared(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
96            Self::Cached(task) => task.into_cleared_trie(max_nodes_capacity, max_values_capacity),
97        }
98    }
99}
100
101/// A task responsible for populating the sparse trie.
102pub(super) struct SparseTrieTask<BPF, A = ParallelSparseTrie, S = ParallelSparseTrie>
103where
104    BPF: TrieNodeProviderFactory + Send + Sync,
105    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
106    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
107{
108    /// Receives updates from the state root task.
109    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
110    /// `SparseStateTrie` used for computing the state root.
111    pub(super) trie: SparseStateTrie<A, S>,
112    pub(super) metrics: MultiProofTaskMetrics,
113    /// Trie node provider factory.
114    blinded_provider_factory: BPF,
115}
116
117impl<BPF, A, S> SparseTrieTask<BPF, A, S>
118where
119    BPF: TrieNodeProviderFactory + Send + Sync + Clone,
120    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
121    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
122    A: SparseTrie + Send + Sync + Default,
123    S: SparseTrie + Send + Sync + Default + Clone,
124{
125    /// Creates a new sparse trie task with the given trie.
126    pub(super) const fn new(
127        updates: mpsc::Receiver<SparseTrieUpdate>,
128        blinded_provider_factory: BPF,
129        metrics: MultiProofTaskMetrics,
130        trie: SparseStateTrie<A, S>,
131    ) -> Self {
132        Self { updates, metrics, trie, blinded_provider_factory }
133    }
134
135    /// Runs the sparse trie task to completion, computing the state root.
136    ///
137    /// Receives [`SparseTrieUpdate`]s until the channel is closed, applying each update
138    /// to the trie. Once all updates are processed, computes and returns the final state root.
139    #[instrument(
140        name = "SparseTrieTask::run",
141        level = "debug",
142        target = "engine::tree::payload_processor::sparse_trie",
143        skip_all
144    )]
145    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
146        let now = Instant::now();
147
148        let mut num_iterations = 0;
149
150        while let Ok(mut update) = self.updates.recv() {
151            num_iterations += 1;
152            let mut num_updates = 1;
153            let _enter =
154                debug_span!(target: "engine::tree::payload_processor::sparse_trie", "drain updates")
155                    .entered();
156            while let Ok(next) = self.updates.try_recv() {
157                update.extend(next);
158                num_updates += 1;
159            }
160            drop(_enter);
161
162            debug!(
163                target: "engine::root",
164                num_updates,
165                account_proofs = update.multiproof.account_proofs_len(),
166                storage_proofs = update.multiproof.storage_proofs_len(),
167                "Updating sparse trie"
168            );
169
170            let elapsed =
171                update_sparse_trie(&mut self.trie, update, &self.blinded_provider_factory)
172                    .map_err(|e| {
173                        ParallelStateRootError::Other(format!(
174                            "could not calculate state root: {e:?}"
175                        ))
176                    })?;
177            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
178            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
179        }
180
181        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
182
183        let start = Instant::now();
184        let (state_root, trie_updates) =
185            self.trie.root_with_updates(&self.blinded_provider_factory).map_err(|e| {
186                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
187            })?;
188
189        let end = Instant::now();
190        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
191        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
192
193        Ok(StateRootComputeOutcome { state_root, trie_updates })
194    }
195
196    /// Clears and shrinks the trie, discarding all state.
197    ///
198    /// Use this when the payload was invalid or cancelled - we don't want to preserve
199    /// potentially invalid trie state, but we keep the allocations for reuse.
200    pub(super) fn into_cleared_trie(
201        self,
202        max_nodes_capacity: usize,
203        max_values_capacity: usize,
204    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
205        let Self { mut trie, .. } = self;
206        trie.clear();
207        trie.shrink_to(max_nodes_capacity, max_values_capacity);
208        let deferred = trie.take_deferred_drops();
209        (trie, deferred)
210    }
211}
212
213/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
214const MAX_PENDING_UPDATES: usize = 100;
215
216/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
217pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
218    /// Sender for proof results.
219    proof_result_tx: CrossbeamSender<ProofResultMessage>,
220    /// Receiver for proof results directly from workers.
221    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
222    /// Receives updates from execution and prewarming.
223    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
224    /// `SparseStateTrie` used for computing the state root.
225    trie: SparseStateTrie<A, S>,
226    /// Handle to the proof worker pools (storage and account).
227    proof_worker_handle: ProofWorkerHandle,
228
229    /// The size of proof targets chunk to spawn in one calculation.
230    /// If None, chunking is disabled and all targets are processed in a single proof.
231    chunk_size: Option<usize>,
232    /// If this number is exceeded and chunking is enabled, then this will override whether or not
233    /// there are any active workers and force chunking across workers. This is to prevent tasks
234    /// which are very long from hitting a single worker.
235    max_targets_for_chunking: usize,
236
237    /// Account trie updates.
238    account_updates: B256Map<LeafUpdate>,
239    /// Storage trie updates. hashed address -> slot -> update.
240    storage_updates: B256Map<B256Map<LeafUpdate>>,
241
242    /// Account updates that are buffered but were not yet applied to the trie.
243    new_account_updates: B256Map<LeafUpdate>,
244    /// Storage updates that are buffered but were not yet applied to the trie.
245    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
246    /// Account updates that are blocked by storage root calculation or account reveal.
247    ///
248    /// Those are being moved into `account_updates` once storage roots
249    /// are revealed and/or calculated.
250    ///
251    /// Invariant: for each entry in `pending_account_updates` account must either be already
252    /// revealed in the trie or have an entry in `account_updates`.
253    ///
254    /// Values can be either of:
255    ///   - None: account had a storage update and is awaiting storage root calculation and/or
256    ///     account node reveal to complete.
257    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
258    ///     to complete.
259    pending_account_updates: B256Map<Option<Option<Account>>>,
260    /// Cache of account proof targets that were already fetched/requested from the proof workers.
261    /// account -> lowest `min_len` requested.
262    fetched_account_targets: B256Map<u8>,
263    /// Cache of storage proof targets that have already been fetched/requested from the proof
264    /// workers. account -> slot -> lowest `min_len` requested.
265    fetched_storage_targets: B256Map<B256Map<u8>>,
266    /// Reusable buffer for RLP encoding of accounts.
267    account_rlp_buf: Vec<u8>,
268    /// Whether the last state update has been received.
269    finished_state_updates: bool,
270    /// Pending targets to be dispatched to the proof workers.
271    pending_targets: MultiProofTargetsV2,
272    /// Number of pending execution/prewarming updates received but not yet passed to
273    /// `update_leaves`.
274    pending_updates: usize,
275
276    /// Metrics for the sparse trie.
277    metrics: MultiProofTaskMetrics,
278}
279
280impl<A, S> SparseTrieCacheTask<A, S>
281where
282    A: SparseTrie + Default,
283    S: SparseTrie + Default + Clone,
284{
285    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
286    pub(super) fn new_with_trie(
287        executor: &Runtime,
288        updates: CrossbeamReceiver<MultiProofMessage>,
289        proof_worker_handle: ProofWorkerHandle,
290        metrics: MultiProofTaskMetrics,
291        trie: SparseStateTrie<A, S>,
292        chunk_size: Option<usize>,
293    ) -> Self {
294        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
295        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
296
297        let parent_span = tracing::Span::current();
298        executor.spawn_blocking(move || {
299            let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
300            Self::run_hashing_task(updates, hashed_state_tx)
301        });
302
303        Self {
304            proof_result_tx,
305            proof_result_rx,
306            updates: hashed_state_rx,
307            proof_worker_handle,
308            trie,
309            chunk_size,
310            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
311            account_updates: Default::default(),
312            storage_updates: Default::default(),
313            new_account_updates: Default::default(),
314            new_storage_updates: Default::default(),
315            pending_account_updates: Default::default(),
316            fetched_account_targets: Default::default(),
317            fetched_storage_targets: Default::default(),
318            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
319            finished_state_updates: Default::default(),
320            pending_targets: Default::default(),
321            pending_updates: Default::default(),
322            metrics,
323        }
324    }
325
326    /// Runs the hashing task that drains updates from the channel and converts them to
327    /// `HashedPostState` in parallel.
328    fn run_hashing_task(
329        updates: CrossbeamReceiver<MultiProofMessage>,
330        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
331    ) {
332        while let Ok(message) = updates.recv() {
333            let msg = match message {
334                MultiProofMessage::PrefetchProofs(targets) => {
335                    SparseTrieTaskMessage::PrefetchProofs(targets)
336                }
337                MultiProofMessage::StateUpdate(_, state) => {
338                    let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing state update", update_len = state.len()).entered();
339                    let hashed = evm_state_to_hashed_post_state(state);
340                    SparseTrieTaskMessage::HashedState(hashed)
341                }
342                MultiProofMessage::FinishedStateUpdates => {
343                    SparseTrieTaskMessage::FinishedStateUpdates
344                }
345                MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
346                    continue
347                }
348                MultiProofMessage::HashedStateUpdate(state) => {
349                    SparseTrieTaskMessage::HashedState(state)
350                }
351            };
352            if hashed_state_tx.send(msg).is_err() {
353                break;
354            }
355        }
356    }
357
358    /// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
359    ///
360    /// Should be called after the state root result has been sent.
361    ///
362    /// When `disable_pruning` is true, the trie is preserved without any node pruning,
363    /// storage trie eviction, or capacity shrinking, keeping the full cache intact for
364    /// benchmarking purposes.
365    pub(super) fn into_trie_for_reuse(
366        self,
367        prune_depth: usize,
368        max_storage_tries: usize,
369        max_nodes_capacity: usize,
370        max_values_capacity: usize,
371        disable_pruning: bool,
372    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
373        let Self { mut trie, .. } = self;
374        if !disable_pruning {
375            trie.prune(prune_depth, max_storage_tries);
376            trie.shrink_to(max_nodes_capacity, max_values_capacity);
377        }
378        let deferred = trie.take_deferred_drops();
379        (trie, deferred)
380    }
381
382    /// Clears and shrinks the trie, discarding all state.
383    ///
384    /// Use this when the payload was invalid or cancelled - we don't want to preserve
385    /// potentially invalid trie state, but we keep the allocations for reuse.
386    pub(super) fn into_cleared_trie(
387        self,
388        max_nodes_capacity: usize,
389        max_values_capacity: usize,
390    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
391        let Self { mut trie, .. } = self;
392        trie.clear();
393        trie.shrink_to(max_nodes_capacity, max_values_capacity);
394        let deferred = trie.take_deferred_drops();
395        (trie, deferred)
396    }
397
398    /// Runs the sparse trie task to completion.
399    ///
400    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
401    /// to the trie and schedules proof fetching when needed.
402    ///
403    /// This concludes once the last state update has been received and processed.
404    #[instrument(
405        name = "SparseTrieCacheTask::run",
406        level = "debug",
407        target = "engine::tree::payload_processor::sparse_trie",
408        skip_all
409    )]
410    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
411        let now = Instant::now();
412
413        loop {
414            crossbeam_channel::select_biased! {
415                recv(self.updates) -> message => {
416                    let update = match message {
417                        Ok(m) => m,
418                        Err(_) => {
419                            return Err(ParallelStateRootError::Other(
420                                "updates channel disconnected before state root calculation".to_string(),
421                            ))
422                        }
423                    };
424
425                    self.on_message(update);
426                    self.pending_updates += 1;
427                }
428                recv(self.proof_result_rx) -> message => {
429                    let Ok(result) = message else {
430                        unreachable!("we own the sender half")
431                    };
432                    let ProofResult::V2(mut result) = result.result? else {
433                        unreachable!("sparse trie as cache must only be used with multiproof v2");
434                    };
435
436                    while let Ok(next) = self.proof_result_rx.try_recv() {
437                        let ProofResult::V2(res) = next.result? else {
438                            unreachable!("sparse trie as cache must only be used with multiproof v2");
439                        };
440                        result.extend(res);
441                    }
442
443                    self.on_proof_result(result)?;
444                },
445            }
446
447            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
448                // If we don't have any pending messages, we can spend some time on computing
449                // storage roots and promoting account updates.
450                self.dispatch_pending_targets();
451                self.process_new_updates()?;
452                self.promote_pending_account_updates()?;
453
454                if self.finished_state_updates &&
455                    self.account_updates.is_empty() &&
456                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
457                {
458                    break;
459                }
460
461                self.dispatch_pending_targets();
462            } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
463                // If we don't have any pending updates OR we've accumulated a lot already, apply
464                // them to the trie,
465                self.process_new_updates()?;
466                self.dispatch_pending_targets();
467            } else if self.pending_targets.chunking_length() > self.chunk_size.unwrap_or_default() {
468                // Make sure to dispatch targets if we've accumulated a lot of them.
469                self.dispatch_pending_targets();
470            }
471        }
472
473        debug!(target: "engine::root", "All proofs processed, ending calculation");
474
475        let start = Instant::now();
476        let (state_root, trie_updates) =
477            self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
478                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
479            })?;
480
481        let end = Instant::now();
482        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
483        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
484
485        Ok(StateRootComputeOutcome { state_root, trie_updates })
486    }
487
488    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
489    fn on_message(&mut self, message: SparseTrieTaskMessage) {
490        match message {
491            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
492            SparseTrieTaskMessage::HashedState(hashed_state) => {
493                self.on_hashed_state_update(hashed_state)
494            }
495            SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
496        }
497    }
498
499    #[instrument(
500        level = "trace",
501        target = "engine::tree::payload_processor::sparse_trie",
502        skip_all
503    )]
504    fn on_prewarm_targets(&mut self, targets: VersionedMultiProofTargets) {
505        let VersionedMultiProofTargets::V2(targets) = targets else {
506            unreachable!("sparse trie as cache must only be used with V2 multiproof targets");
507        };
508
509        for target in targets.account_targets {
510            // Only touch accounts that are not yet present in the updates set.
511            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
512        }
513
514        for (address, slots) in targets.storage_targets {
515            for slot in slots {
516                // Only touch storages that are not yet present in the updates set.
517                self.new_storage_updates
518                    .entry(address)
519                    .or_default()
520                    .entry(slot.key())
521                    .or_insert(LeafUpdate::Touched);
522            }
523
524            // Touch corresponding account leaf to make sure its revealed in accounts trie for
525            // storage root update.
526            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
527        }
528    }
529
530    /// Processes a hashed state update and encodes all state changes as trie updates.
531    #[instrument(
532        level = "trace",
533        target = "engine::tree::payload_processor::sparse_trie",
534        skip_all
535    )]
536    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
537        for (address, storage) in hashed_state_update.storages {
538            for (slot, value) in storage.storage {
539                let encoded = if value.is_zero() {
540                    Vec::new()
541                } else {
542                    alloy_rlp::encode_fixed_size(&value).to_vec()
543                };
544                self.new_storage_updates
545                    .entry(address)
546                    .or_default()
547                    .insert(slot, LeafUpdate::Changed(encoded));
548
549                // Remove an existing storage update if it exists.
550                self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
551            }
552
553            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
554            // trie for storage root update.
555            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
556
557            // Make sure account is tracked in `pending_account_updates` so that once storage root
558            // is computed, it will be updated in the accounts trie.
559            self.pending_account_updates.entry(address).or_insert(None);
560        }
561
562        for (address, account) in hashed_state_update.accounts {
563            // Track account as touched.
564            //
565            // This might overwrite an existing update, which is fine, because storage root from it
566            // is already tracked in the trie and can be easily fetched again.
567            self.new_account_updates.insert(address, LeafUpdate::Touched);
568
569            // Track account in `pending_account_updates` so that once storage root is computed,
570            // it will be updated in the accounts trie.
571            self.pending_account_updates.insert(address, Some(account));
572        }
573    }
574
575    fn on_proof_result(
576        &mut self,
577        result: DecodedMultiProofV2,
578    ) -> Result<(), ParallelStateRootError> {
579        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
580            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
581        })
582    }
583
584    #[instrument(
585        level = "debug",
586        target = "engine::tree::payload_processor::sparse_trie",
587        skip_all
588    )]
589    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
590        self.pending_updates = 0;
591
592        // Firstly apply all new storage and account updates to the tries.
593        self.process_leaf_updates(true)?;
594
595        for (address, mut new) in self.new_storage_updates.drain() {
596            let updates = self.storage_updates.entry(address).or_default();
597            for (slot, new) in new.drain() {
598                match updates.entry(slot) {
599                    Entry::Occupied(mut entry) => {
600                        // Only overwrite existing entries with new values
601                        if new.is_changed() {
602                            entry.insert(new);
603                        }
604                    }
605                    Entry::Vacant(entry) => {
606                        entry.insert(new);
607                    }
608                }
609            }
610        }
611
612        for (address, new) in self.new_account_updates.drain() {
613            match self.account_updates.entry(address) {
614                Entry::Occupied(mut entry) => {
615                    if new.is_changed() {
616                        entry.insert(new);
617                    }
618                }
619                Entry::Vacant(entry) => {
620                    entry.insert(new);
621                }
622            }
623        }
624
625        Ok(())
626    }
627
628    /// Applies all account and storage leaf updates to corresponding tries and collects any new
629    /// multiproof targets.
630    #[instrument(
631        level = "debug",
632        target = "engine::tree::payload_processor::sparse_trie",
633        skip_all
634    )]
635    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
636        let storage_updates =
637            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
638
639        // Process all storage updates in parallel, skipping tries with no pending updates.
640        let span = tracing::Span::current();
641        let storage_results = storage_updates
642            .iter_mut()
643            .filter(|(_, updates)| !updates.is_empty())
644            .map(|(address, updates)| {
645                let trie = self.trie.take_or_create_storage_trie(address);
646                let fetched = self.fetched_storage_targets.remove(address).unwrap_or_default();
647
648                (address, updates, fetched, trie)
649            })
650            .par_bridge_buffered()
651            .map(|(address, updates, mut fetched, mut trie)| {
652                let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie leaf updates", ?address).entered();
653                let mut targets = Vec::new();
654
655                trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
656                    Entry::Occupied(mut entry) => {
657                        if min_len < *entry.get() {
658                            entry.insert(min_len);
659                            targets.push(Target::new(path).with_min_len(min_len));
660                        }
661                    }
662                    Entry::Vacant(entry) => {
663                        entry.insert(min_len);
664                        targets.push(Target::new(path).with_min_len(min_len));
665                    }
666                })?;
667
668                SparseTrieResult::Ok((address, targets, fetched, trie))
669            })
670            .collect::<Result<Vec<_>, _>>()?;
671
672        drop(span);
673
674        for (address, targets, fetched, trie) in storage_results {
675            self.fetched_storage_targets.insert(*address, fetched);
676            self.trie.insert_storage_trie(*address, trie);
677
678            if !targets.is_empty() {
679                self.pending_targets.storage_targets.entry(*address).or_default().extend(targets);
680            }
681        }
682
683        // Process account trie updates and fill the account targets.
684        self.process_account_leaf_updates(new)?;
685
686        Ok(())
687    }
688
689    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
690    ///
691    /// Returns whether any updates were drained (applied to the trie).
692    #[instrument(
693        level = "debug",
694        target = "engine::tree::payload_processor::sparse_trie",
695        skip_all
696    )]
697    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
698        let account_updates =
699            if new { &mut self.new_account_updates } else { &mut self.account_updates };
700
701        let updates_len_before = account_updates.len();
702
703        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
704            match self.fetched_account_targets.entry(target) {
705                Entry::Occupied(mut entry) => {
706                    if min_len < *entry.get() {
707                        entry.insert(min_len);
708                        self.pending_targets
709                            .account_targets
710                            .push(Target::new(target).with_min_len(min_len));
711                    }
712                }
713                Entry::Vacant(entry) => {
714                    entry.insert(min_len);
715                    self.pending_targets
716                        .account_targets
717                        .push(Target::new(target).with_min_len(min_len));
718                }
719            }
720        })?;
721
722        Ok(account_updates.len() < updates_len_before)
723    }
724
725    /// Iterates through all storage tries for which all updates were processed, computes their
726    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
727    /// for accounts trie.
728    #[instrument(
729        level = "debug",
730        target = "engine::tree::payload_processor::sparse_trie",
731        skip_all
732    )]
733    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
734        self.process_leaf_updates(false)?;
735
736        if self.pending_account_updates.is_empty() {
737            return Ok(());
738        }
739
740        let span = debug_span!("compute_storage_roots").entered();
741        self
742            .trie
743            .storage_tries_mut()
744            .iter_mut()
745            .filter(|(address, trie)| {
746                self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
747                    !trie.is_root_cached()
748            })
749            .par_bridge_buffered()
750            .for_each(|(address, trie)| {
751                let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
752                trie.root().expect("updates are drained, trie should be revealed by now");
753            });
754        drop(span);
755
756        loop {
757            let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
758            // Now handle pending account updates that can be upgraded to a proper update.
759            let account_rlp_buf = &mut self.account_rlp_buf;
760            let mut num_promoted = 0;
761            self.pending_account_updates.retain(|addr, account| {
762                if let Some(updates) = self.storage_updates.get(addr) {
763                    if !updates.is_empty() {
764                        // If account has pending storage updates, it is still pending.
765                        return true;
766                    } else if let Some(account) = account.take() {
767                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
768                        let encoded = if account.is_none_or(|account| account.is_empty()) &&
769                            storage_root == EMPTY_ROOT_HASH
770                        {
771                            Vec::new()
772                        } else {
773                            account_rlp_buf.clear();
774                            account
775                                .unwrap_or_default()
776                                .into_trie_account(storage_root)
777                                .encode(account_rlp_buf);
778                            account_rlp_buf.clone()
779                        };
780                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
781                        num_promoted += 1;
782                        return false;
783                    }
784                }
785
786                // Get the current account state either from the trie or from latest account update.
787                let trie_account = if let Some(LeafUpdate::Changed(encoded)) = self.account_updates.get(addr) {
788                    Some(encoded).filter(|encoded| !encoded.is_empty())
789                } else if !self.account_updates.contains_key(addr) {
790                    self.trie.get_account_value(addr)
791                } else {
792                    // Needs to be revealed first
793                    return true;
794                };
795
796                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
797
798                let (account, storage_root) = if let Some(account) = account.take() {
799                    // If account is Some(_) here it means it didn't have any storage updates
800                    // and we can fetch the storage root directly from the account trie.
801                    //
802                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
803                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
804
805                    (account, storage_root)
806                } else {
807                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
808                };
809
810                let encoded = if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
811                    Vec::new()
812                } else {
813                    account_rlp_buf.clear();
814                    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
815                    account_rlp_buf.clone()
816                };
817                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
818                num_promoted += 1;
819
820                false
821            });
822            span.record("promoted", num_promoted);
823            drop(span);
824
825            // Only exit when no new updates are processed.
826            //
827            // We need to keep iterating if any updates are being drained because that might
828            // indicate that more pending account updates can be promoted.
829            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
830                break
831            }
832        }
833
834        Ok(())
835    }
836
837    #[instrument(
838        level = "debug",
839        target = "engine::tree::payload_processor::sparse_trie",
840        skip_all
841    )]
842    fn dispatch_pending_targets(&mut self) {
843        if !self.pending_targets.is_empty() {
844            let chunking_length = self.pending_targets.chunking_length();
845            dispatch_with_chunking(
846                std::mem::take(&mut self.pending_targets),
847                chunking_length,
848                self.chunk_size,
849                self.max_targets_for_chunking,
850                self.proof_worker_handle.available_account_workers(),
851                self.proof_worker_handle.available_storage_workers(),
852                MultiProofTargetsV2::chunks,
853                |proof_targets| {
854                    if let Err(e) = self.proof_worker_handle.dispatch_account_multiproof(
855                        AccountMultiproofInput::V2 {
856                            targets: proof_targets,
857                            proof_result_sender: ProofResultContext::new(
858                                self.proof_result_tx.clone(),
859                                0,
860                                HashedPostState::default(),
861                                Instant::now(),
862                            ),
863                        },
864                    ) {
865                        error!("failed to dispatch account multiproof: {e:?}");
866                    }
867                },
868            );
869        }
870    }
871}
872
873/// Message type for the sparse trie task.
874enum SparseTrieTaskMessage {
875    /// A hashed state update ready to be processed.
876    HashedState(HashedPostState),
877    /// Prefetch proof targets (passed through directly).
878    PrefetchProofs(VersionedMultiProofTargets),
879    /// Signals that all state updates have been received.
880    FinishedStateUpdates,
881}
882
883/// Outcome of the state root computation, including the state root itself with
884/// the trie updates.
885#[derive(Debug)]
886pub struct StateRootComputeOutcome {
887    /// The state root.
888    pub state_root: B256,
889    /// The trie updates.
890    pub trie_updates: TrieUpdates,
891}
892
893/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
894#[instrument(level = "debug", target = "engine::tree::payload_processor::sparse_trie", skip_all)]
895pub(crate) fn update_sparse_trie<BPF, A, S>(
896    trie: &mut SparseStateTrie<A, S>,
897    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
898    blinded_provider_factory: &BPF,
899) -> SparseStateTrieResult<Duration>
900where
901    BPF: TrieNodeProviderFactory + Send + Sync,
902    BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
903    BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
904    A: SparseTrie + Send + Sync + Default,
905    S: SparseTrie + Send + Sync + Default + Clone,
906{
907    trace!(target: "engine::root::sparse", "Updating sparse trie");
908    let started_at = Instant::now();
909
910    // Reveal new accounts and storage slots.
911    match multiproof {
912        ProofResult::Legacy(decoded, _) => {
913            trie.reveal_decoded_multiproof(decoded)?;
914        }
915        ProofResult::V2(decoded_v2) => {
916            trie.reveal_decoded_multiproof_v2(decoded_v2)?;
917        }
918    }
919    let reveal_multiproof_elapsed = started_at.elapsed();
920    trace!(
921        target: "engine::root::sparse",
922        ?reveal_multiproof_elapsed,
923        "Done revealing multiproof"
924    );
925
926    // Update storage slots with new values and calculate storage roots.
927    let span = tracing::Span::current();
928    let results: Vec<_> = state
929        .storages
930        .into_iter()
931        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
932        .par_bridge_buffered()
933        .map(|(address, storage, storage_trie)| {
934            let _enter =
935                debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage trie", ?address)
936                    .entered();
937
938            trace!(target: "engine::tree::payload_processor::sparse_trie", "Updating storage");
939            let storage_provider = blinded_provider_factory.storage_node_provider(address);
940            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
941
942            if storage.wiped {
943                trace!(target: "engine::tree::payload_processor::sparse_trie", "Wiping storage");
944                storage_trie.wipe()?;
945            }
946
947            // Defer leaf removals until after updates/additions, so that we don't delete an
948            // intermediate branch node during a removal and then re-add that branch back during a
949            // later leaf addition. This is an optimization, but also a requirement inherited from
950            // multiproof generating, which can't know the order that leaf operations happen in.
951            let mut removed_slots = SmallVec::<[Nibbles; 8]>::new();
952
953            for (slot, value) in storage.storage {
954                let slot_nibbles = Nibbles::unpack(slot);
955
956                if value.is_zero() {
957                    removed_slots.push(slot_nibbles);
958                    continue;
959                }
960
961                trace!(target: "engine::tree::payload_processor::sparse_trie", ?slot_nibbles, "Updating storage slot");
962                storage_trie.update_leaf(
963                    slot_nibbles,
964                    alloy_rlp::encode_fixed_size(&value).to_vec(),
965                    &storage_provider,
966                )?;
967            }
968
969            for slot_nibbles in removed_slots {
970                trace!(target: "engine::root::sparse", ?slot_nibbles, "Removing storage slot");
971                storage_trie.remove_leaf(&slot_nibbles, &storage_provider)?;
972            }
973
974            storage_trie.root();
975
976            SparseStateTrieResult::Ok((address, storage_trie))
977        })
978        .collect();
979
980    // Defer leaf removals until after updates/additions, so that we don't delete an intermediate
981    // branch node during a removal and then re-add that branch back during a later leaf addition.
982    // This is an optimization, but also a requirement inherited from multiproof generating, which
983    // can't know the order that leaf operations happen in.
984    let mut removed_accounts = Vec::new();
985
986    // Update account storage roots
987    let _enter =
988        tracing::debug_span!(target: "engine::tree::payload_processor::sparse_trie", "account trie")
989            .entered();
990    for result in results {
991        let (address, storage_trie) = result?;
992        trie.insert_storage_trie(address, storage_trie);
993
994        if let Some(account) = state.accounts.remove(&address) {
995            // If the account itself has an update, remove it from the state update and update in
996            // one go instead of doing it down below.
997            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
998            if !trie.update_account(
999                address,
1000                account.unwrap_or_default(),
1001                blinded_provider_factory,
1002            )? {
1003                removed_accounts.push(address);
1004            }
1005        } else if trie.is_account_revealed(address) {
1006            // Otherwise, if the account is revealed, only update its storage root.
1007            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
1008            if !trie.update_account_storage_root(address, blinded_provider_factory)? {
1009                removed_accounts.push(address);
1010            }
1011        }
1012    }
1013
1014    // Update accounts
1015    for (address, account) in state.accounts {
1016        trace!(target: "engine::root::sparse", ?address, "Updating account");
1017        if !trie.update_account(address, account.unwrap_or_default(), blinded_provider_factory)? {
1018            removed_accounts.push(address);
1019        }
1020    }
1021
1022    // Remove accounts
1023    for address in removed_accounts {
1024        trace!(target: "engine::root::sparse", ?address, "Removing account");
1025        let nibbles = Nibbles::unpack(address);
1026        trie.remove_account_leaf(&nibbles, blinded_provider_factory)?;
1027    }
1028
1029    let elapsed_before = started_at.elapsed();
1030    trace!(
1031        target: "engine::root::sparse",
1032        "Calculating subtries"
1033    );
1034    trie.calculate_subtries();
1035
1036    let elapsed = started_at.elapsed();
1037    let below_level_elapsed = elapsed - elapsed_before;
1038    trace!(
1039        target: "engine::root::sparse",
1040        ?below_level_elapsed,
1041        "Intermediate nodes calculated"
1042    );
1043
1044    Ok(elapsed)
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049    use super::*;
1050    use alloy_primitives::{keccak256, Address, U256};
1051    use reth_trie_sparse::ParallelSparseTrie;
1052
1053    #[test]
1054    fn test_run_hashing_task_hashed_state_update_forwards() {
1055        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1056        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
1057
1058        let address = keccak256(Address::random());
1059        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
1060        let value = U256::from(999);
1061
1062        let mut hashed_state = HashedPostState::default();
1063        hashed_state.accounts.insert(
1064            address,
1065            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
1066        );
1067        let mut storage = reth_trie::HashedStorage::new(false);
1068        storage.storage.insert(slot, value);
1069        hashed_state.storages.insert(address, storage);
1070
1071        let expected_state = hashed_state.clone();
1072
1073        let handle = std::thread::spawn(move || {
1074            SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
1075                updates_rx,
1076                hashed_state_tx,
1077            );
1078        });
1079
1080        updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
1081        updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
1082        drop(updates_tx);
1083
1084        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
1085            panic!("expected HashedState message");
1086        };
1087
1088        let account = received.accounts.get(&address).unwrap().unwrap();
1089        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
1090        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
1091
1092        let storage = received.storages.get(&address).unwrap();
1093        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
1094
1095        let second = hashed_state_rx.recv().unwrap();
1096        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
1097
1098        assert!(hashed_state_rx.recv().is_err());
1099        handle.join().unwrap();
1100    }
1101}