Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use std::sync::Arc;
4
5use crate::tree::{
6    multiproof::{
7        dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
8        DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9    },
10    payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::ParallelIterator;
16use reth_primitives_traits::{Account, FastInstant as Instant, ParallelBridgeBuffered};
17use reth_tasks::Runtime;
18use reth_trie::{
19    updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20    TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24    proof_task::{
25        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26    },
27    root::ParallelStateRootError,
28};
29#[cfg(feature = "trie-debug")]
30use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
31use reth_trie_sparse::{
32    errors::SparseTrieResult, DeferredDrops, LeafUpdate, ParallelSparseTrie, SparseStateTrie,
33    SparseTrie,
34};
35use revm_primitives::{hash_map::Entry, B256Map};
36use tracing::{debug, debug_span, error, instrument, trace_span};
37
38/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
39const MAX_PENDING_UPDATES: usize = 100;
40
41/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
42pub(super) struct SparseTrieCacheTask<A = ParallelSparseTrie, S = ParallelSparseTrie> {
43    /// Sender for proof results.
44    proof_result_tx: CrossbeamSender<ProofResultMessage>,
45    /// Receiver for proof results directly from workers.
46    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
47    /// Receives updates from execution and prewarming.
48    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
49    /// `SparseStateTrie` used for computing the state root.
50    trie: SparseStateTrie<A, S>,
51    /// Handle to the proof worker pools (storage and account).
52    proof_worker_handle: ProofWorkerHandle,
53
54    /// The size of proof targets chunk to spawn in one calculation.
55    /// If None, chunking is disabled and all targets are processed in a single proof.
56    chunk_size: usize,
57    /// If this number is exceeded and chunking is enabled, then this will override whether or not
58    /// there are any active workers and force chunking across workers. This is to prevent tasks
59    /// which are very long from hitting a single worker.
60    max_targets_for_chunking: usize,
61
62    /// Account trie updates.
63    account_updates: B256Map<LeafUpdate>,
64    /// Storage trie updates. hashed address -> slot -> update.
65    storage_updates: B256Map<B256Map<LeafUpdate>>,
66
67    /// Account updates that are buffered but were not yet applied to the trie.
68    new_account_updates: B256Map<LeafUpdate>,
69    /// Storage updates that are buffered but were not yet applied to the trie.
70    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
71    /// Account updates that are blocked by storage root calculation or account reveal.
72    ///
73    /// Those are being moved into `account_updates` once storage roots
74    /// are revealed and/or calculated.
75    ///
76    /// Invariant: for each entry in `pending_account_updates` account must either be already
77    /// revealed in the trie or have an entry in `account_updates`.
78    ///
79    /// Values can be either of:
80    ///   - None: account had a storage update and is awaiting storage root calculation and/or
81    ///     account node reveal to complete.
82    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
83    ///     to complete.
84    pending_account_updates: B256Map<Option<Option<Account>>>,
85    /// Cache of account proof targets that were already fetched/requested from the proof workers.
86    /// account -> lowest `min_len` requested.
87    fetched_account_targets: B256Map<u8>,
88    /// Cache of storage proof targets that have already been fetched/requested from the proof
89    /// workers. account -> slot -> lowest `min_len` requested.
90    fetched_storage_targets: B256Map<B256Map<u8>>,
91    /// Reusable buffer for RLP encoding of accounts.
92    account_rlp_buf: Vec<u8>,
93    /// Whether the last state update has been received.
94    finished_state_updates: bool,
95    /// Accumulated account leaf update cache hits.
96    account_cache_hits: u64,
97    /// Accumulated account leaf update cache misses.
98    account_cache_misses: u64,
99    /// Accumulated storage leaf update cache hits.
100    storage_cache_hits: u64,
101    /// Accumulated storage leaf update cache misses.
102    storage_cache_misses: u64,
103    /// Pending proof targets queued for dispatch to proof workers.
104    pending_targets: PendingTargets,
105    /// Number of pending execution/prewarming updates received but not yet passed to
106    /// `update_leaves`.
107    pending_updates: usize,
108
109    /// Metrics for the sparse trie.
110    metrics: MultiProofTaskMetrics,
111}
112
113impl<A, S> SparseTrieCacheTask<A, S>
114where
115    A: SparseTrie + Default,
116    S: SparseTrie + Default + Clone,
117{
118    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
119    pub(super) fn new_with_trie(
120        executor: &Runtime,
121        updates: CrossbeamReceiver<MultiProofMessage>,
122        proof_worker_handle: ProofWorkerHandle,
123        metrics: MultiProofTaskMetrics,
124        trie: SparseStateTrie<A, S>,
125        chunk_size: usize,
126    ) -> Self {
127        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
128        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
129
130        let parent_span = tracing::Span::current();
131        executor.spawn_blocking_named("trie-hashing", move || {
132            let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
133            Self::run_hashing_task(updates, hashed_state_tx)
134        });
135
136        Self {
137            proof_result_tx,
138            proof_result_rx,
139            updates: hashed_state_rx,
140            proof_worker_handle,
141            trie,
142            chunk_size,
143            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
144            account_updates: Default::default(),
145            storage_updates: Default::default(),
146            new_account_updates: Default::default(),
147            new_storage_updates: Default::default(),
148            pending_account_updates: Default::default(),
149            fetched_account_targets: Default::default(),
150            fetched_storage_targets: Default::default(),
151            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
152            finished_state_updates: Default::default(),
153            account_cache_hits: 0,
154            account_cache_misses: 0,
155            storage_cache_hits: 0,
156            storage_cache_misses: 0,
157            pending_targets: Default::default(),
158            pending_updates: Default::default(),
159            metrics,
160        }
161    }
162
163    /// Runs the hashing task that drains updates from the channel and converts them to
164    /// `HashedPostState` in parallel.
165    fn run_hashing_task(
166        updates: CrossbeamReceiver<MultiProofMessage>,
167        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
168    ) {
169        while let Ok(message) = updates.recv() {
170            let msg = match message {
171                MultiProofMessage::PrefetchProofs(targets) => {
172                    SparseTrieTaskMessage::PrefetchProofs(targets)
173                }
174                MultiProofMessage::StateUpdate(_, state) => {
175                    let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
176                    let hashed = evm_state_to_hashed_post_state(state);
177                    SparseTrieTaskMessage::HashedState(hashed)
178                }
179                MultiProofMessage::FinishedStateUpdates => {
180                    SparseTrieTaskMessage::FinishedStateUpdates
181                }
182                MultiProofMessage::EmptyProof { .. } | MultiProofMessage::BlockAccessList(_) => {
183                    continue
184                }
185                MultiProofMessage::HashedStateUpdate(state) => {
186                    SparseTrieTaskMessage::HashedState(state)
187                }
188            };
189            if hashed_state_tx.send(msg).is_err() {
190                break;
191            }
192        }
193    }
194
195    /// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
196    ///
197    /// Should be called after the state root result has been sent.
198    ///
199    /// When `disable_pruning` is true, the trie is preserved without any node pruning,
200    /// storage trie eviction, or capacity shrinking, keeping the full cache intact for
201    /// benchmarking purposes.
202    pub(super) fn into_trie_for_reuse(
203        self,
204        prune_depth: usize,
205        max_storage_tries: usize,
206        max_nodes_capacity: usize,
207        max_values_capacity: usize,
208        disable_pruning: bool,
209        updates: &TrieUpdates,
210    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
211        let Self { mut trie, .. } = self;
212        trie.commit_updates(updates);
213        if !disable_pruning {
214            trie.prune(prune_depth, max_storage_tries);
215            trie.shrink_to(max_nodes_capacity, max_values_capacity);
216        }
217        let deferred = trie.take_deferred_drops();
218        (trie, deferred)
219    }
220
221    /// Clears and shrinks the trie, discarding all state.
222    ///
223    /// Use this when the payload was invalid or cancelled - we don't want to preserve
224    /// potentially invalid trie state, but we keep the allocations for reuse.
225    pub(super) fn into_cleared_trie(
226        self,
227        max_nodes_capacity: usize,
228        max_values_capacity: usize,
229    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
230        let Self { mut trie, .. } = self;
231        trie.clear();
232        trie.shrink_to(max_nodes_capacity, max_values_capacity);
233        let deferred = trie.take_deferred_drops();
234        (trie, deferred)
235    }
236
237    /// Runs the sparse trie task to completion.
238    ///
239    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
240    /// to the trie and schedules proof fetching when needed.
241    ///
242    /// This concludes once the last state update has been received and processed.
243    #[instrument(
244        name = "SparseTrieCacheTask::run",
245        level = "debug",
246        target = "engine::tree::payload_processor::sparse_trie",
247        skip_all
248    )]
249    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
250        let now = Instant::now();
251
252        loop {
253            let mut t = Instant::now();
254            crossbeam_channel::select_biased! {
255                recv(self.updates) -> message => {
256                    self.metrics
257                        .sparse_trie_channel_wait_duration_histogram
258                        .record(t.elapsed());
259
260                    let update = match message {
261                        Ok(m) => m,
262                        Err(_) => {
263                            return Err(ParallelStateRootError::Other(
264                                "updates channel disconnected before state root calculation".to_string(),
265                            ))
266                        }
267                    };
268
269                    self.on_message(update);
270                    self.pending_updates += 1;
271                }
272                recv(self.proof_result_rx) -> message => {
273                    let phase_end = Instant::now();
274                    self.metrics
275                        .sparse_trie_channel_wait_duration_histogram
276                        .record(phase_end.duration_since(t));
277                    t = phase_end;
278
279                    let Ok(result) = message else {
280                        unreachable!("we own the sender half")
281                    };
282
283                    let mut result = result.result?;
284                    while let Ok(next) = self.proof_result_rx.try_recv() {
285                        let res = next.result?;
286                        result.extend(res);
287                    }
288
289                    let phase_end = Instant::now();
290                    self.metrics
291                        .sparse_trie_proof_coalesce_duration_histogram
292                        .record(phase_end.duration_since(t));
293                    t = phase_end;
294
295                    self.on_proof_result(result)?;
296                    self.metrics
297                        .sparse_trie_reveal_multiproof_duration_histogram
298                        .record(t.elapsed());
299                },
300            }
301
302            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
303                // If we don't have any pending messages, we can spend some time on computing
304                // storage roots and promoting account updates.
305                self.dispatch_pending_targets();
306                t = Instant::now();
307                self.process_new_updates()?;
308                self.promote_pending_account_updates()?;
309                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
310
311                if self.finished_state_updates &&
312                    self.account_updates.is_empty() &&
313                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
314                {
315                    break;
316                }
317
318                self.dispatch_pending_targets();
319            } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
320                // If we don't have any pending updates OR we've accumulated a lot already, apply
321                // them to the trie,
322                t = Instant::now();
323                self.process_new_updates()?;
324                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
325                self.dispatch_pending_targets();
326            } else if self.pending_targets.len() > self.chunk_size {
327                // Make sure to dispatch targets if we've accumulated a lot of them.
328                self.dispatch_pending_targets();
329            }
330        }
331
332        debug!(target: "engine::root", "All proofs processed, ending calculation");
333
334        let start = Instant::now();
335        let (state_root, trie_updates) =
336            self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
337                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
338            })?;
339
340        #[cfg(feature = "trie-debug")]
341        let debug_recorders = self.trie.take_debug_recorders();
342
343        let end = Instant::now();
344        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
345        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
346
347        self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
348        self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
349        self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
350        self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
351        self.account_cache_hits = 0;
352        self.account_cache_misses = 0;
353        self.storage_cache_hits = 0;
354        self.storage_cache_misses = 0;
355
356        Ok(StateRootComputeOutcome {
357            state_root,
358            trie_updates: Arc::new(trie_updates),
359            #[cfg(feature = "trie-debug")]
360            debug_recorders,
361        })
362    }
363
364    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
365    fn on_message(&mut self, message: SparseTrieTaskMessage) {
366        match message {
367            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
368            SparseTrieTaskMessage::HashedState(hashed_state) => {
369                self.on_hashed_state_update(hashed_state)
370            }
371            SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
372        }
373    }
374
375    #[instrument(
376        level = "trace",
377        target = "engine::tree::payload_processor::sparse_trie",
378        skip_all
379    )]
380    fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
381        for target in targets.account_targets {
382            // Only touch accounts that are not yet present in the updates set.
383            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
384        }
385
386        for (address, slots) in targets.storage_targets {
387            for slot in slots {
388                // Only touch storages that are not yet present in the updates set.
389                self.new_storage_updates
390                    .entry(address)
391                    .or_default()
392                    .entry(slot.key())
393                    .or_insert(LeafUpdate::Touched);
394            }
395
396            // Touch corresponding account leaf to make sure its revealed in accounts trie for
397            // storage root update.
398            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
399        }
400    }
401
402    /// Processes a hashed state update and encodes all state changes as trie updates.
403    #[instrument(
404        level = "trace",
405        target = "engine::tree::payload_processor::sparse_trie",
406        skip_all
407    )]
408    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
409        for (address, storage) in hashed_state_update.storages {
410            for (slot, value) in storage.storage {
411                let encoded = if value.is_zero() {
412                    Vec::new()
413                } else {
414                    alloy_rlp::encode_fixed_size(&value).to_vec()
415                };
416                self.new_storage_updates
417                    .entry(address)
418                    .or_default()
419                    .insert(slot, LeafUpdate::Changed(encoded));
420
421                // Remove an existing storage update if it exists.
422                self.storage_updates.get_mut(&address).and_then(|updates| updates.remove(&slot));
423            }
424
425            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
426            // trie for storage root update.
427            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
428
429            // Make sure account is tracked in `pending_account_updates` so that once storage root
430            // is computed, it will be updated in the accounts trie.
431            self.pending_account_updates.entry(address).or_insert(None);
432        }
433
434        for (address, account) in hashed_state_update.accounts {
435            // Track account as touched.
436            //
437            // This might overwrite an existing update, which is fine, because storage root from it
438            // is already tracked in the trie and can be easily fetched again.
439            self.new_account_updates.insert(address, LeafUpdate::Touched);
440
441            // Track account in `pending_account_updates` so that once storage root is computed,
442            // it will be updated in the accounts trie.
443            self.pending_account_updates.insert(address, Some(account));
444        }
445    }
446
447    fn on_proof_result(
448        &mut self,
449        result: DecodedMultiProofV2,
450    ) -> Result<(), ParallelStateRootError> {
451        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
452            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
453        })
454    }
455
456    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
457        if self.pending_updates == 0 {
458            return Ok(());
459        }
460
461        let _span = debug_span!("process_new_updates").entered();
462        self.pending_updates = 0;
463
464        // Firstly apply all new storage and account updates to the tries.
465        self.process_leaf_updates(true)?;
466
467        for (address, mut new) in self.new_storage_updates.drain() {
468            match self.storage_updates.entry(address) {
469                Entry::Vacant(entry) => {
470                    entry.insert(new); // insert the whole map at once, no per-slot loop
471                }
472                Entry::Occupied(mut entry) => {
473                    let updates = entry.get_mut();
474                    for (slot, new) in new.drain() {
475                        match updates.entry(slot) {
476                            Entry::Occupied(mut slot_entry) => {
477                                if new.is_changed() {
478                                    slot_entry.insert(new);
479                                }
480                            }
481                            Entry::Vacant(slot_entry) => {
482                                slot_entry.insert(new);
483                            }
484                        }
485                    }
486                }
487            }
488        }
489
490        for (address, new) in self.new_account_updates.drain() {
491            match self.account_updates.entry(address) {
492                Entry::Occupied(mut entry) => {
493                    if new.is_changed() {
494                        entry.insert(new);
495                    }
496                }
497                Entry::Vacant(entry) => {
498                    entry.insert(new);
499                }
500            }
501        }
502
503        Ok(())
504    }
505
506    /// Applies all account and storage leaf updates to corresponding tries and collects any new
507    /// multiproof targets.
508    #[instrument(
509        level = "debug",
510        target = "engine::tree::payload_processor::sparse_trie",
511        skip_all
512    )]
513    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
514        let storage_updates =
515            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
516
517        // Process all storage updates, skipping tries with no pending updates.
518        let span = debug_span!("process_storage_leaf_updates").entered();
519        for (address, updates) in storage_updates {
520            if updates.is_empty() {
521                continue;
522            }
523            let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
524
525            let trie = self.trie.get_or_create_storage_trie_mut(*address);
526            let fetched = self.fetched_storage_targets.entry(*address).or_default();
527            let mut targets = Vec::new();
528
529            let updates_len_before = updates.len();
530            trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
531                Entry::Occupied(mut entry) => {
532                    if min_len < *entry.get() {
533                        entry.insert(min_len);
534                        targets.push(ProofV2Target::new(path).with_min_len(min_len));
535                    }
536                }
537                Entry::Vacant(entry) => {
538                    entry.insert(min_len);
539                    targets.push(ProofV2Target::new(path).with_min_len(min_len));
540                }
541            })?;
542            let updates_len_after = updates.len();
543            self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
544            self.storage_cache_misses += updates_len_after as u64;
545
546            if !targets.is_empty() {
547                self.pending_targets.extend_storage_targets(address, targets);
548            }
549        }
550
551        drop(span);
552
553        // Process account trie updates and fill the account targets.
554        self.process_account_leaf_updates(new)?;
555
556        Ok(())
557    }
558
559    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
560    ///
561    /// Returns whether any updates were drained (applied to the trie).
562    #[instrument(
563        level = "debug",
564        target = "engine::tree::payload_processor::sparse_trie",
565        skip_all
566    )]
567    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
568        let account_updates =
569            if new { &mut self.new_account_updates } else { &mut self.account_updates };
570
571        let updates_len_before = account_updates.len();
572
573        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
574            match self.fetched_account_targets.entry(target) {
575                Entry::Occupied(mut entry) => {
576                    if min_len < *entry.get() {
577                        entry.insert(min_len);
578                        self.pending_targets
579                            .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
580                    }
581                }
582                Entry::Vacant(entry) => {
583                    entry.insert(min_len);
584                    self.pending_targets
585                        .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
586                }
587            }
588        })?;
589
590        let updates_len_after = account_updates.len();
591        self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
592        self.account_cache_misses += updates_len_after as u64;
593
594        Ok(updates_len_after < updates_len_before)
595    }
596
597    /// Iterates through all storage tries for which all updates were processed, computes their
598    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
599    /// for accounts trie.
600    #[instrument(
601        level = "debug",
602        target = "engine::tree::payload_processor::sparse_trie",
603        skip_all
604    )]
605    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
606        self.process_leaf_updates(false)?;
607
608        if self.pending_account_updates.is_empty() {
609            return Ok(());
610        }
611
612        let span = debug_span!("compute_storage_roots").entered();
613        self
614            .trie
615            .storage_tries_mut()
616            .iter_mut()
617            .filter(|(address, trie)| {
618                self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
619                    !trie.is_root_cached()
620            })
621            .par_bridge_buffered()
622            .for_each(|(address, trie)| {
623                let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_root", ?address).entered();
624                trie.root().expect("updates are drained, trie should be revealed by now");
625            });
626        drop(span);
627
628        loop {
629            let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
630            // Now handle pending account updates that can be upgraded to a proper update.
631            let account_rlp_buf = &mut self.account_rlp_buf;
632            let mut num_promoted = 0;
633            self.pending_account_updates.retain(|addr, account| {
634                if let Some(updates) = self.storage_updates.get(addr) {
635                    if !updates.is_empty() {
636                        // If account has pending storage updates, it is still pending.
637                        return true;
638                    } else if let Some(account) = account.take() {
639                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
640                        let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
641                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
642                        num_promoted += 1;
643                        return false;
644                    }
645                }
646
647                // Get the current account state either from the trie or from latest account update.
648                let trie_account = match self.account_updates.get(addr) {
649                    Some(LeafUpdate::Changed(encoded)) => {
650                        Some(encoded).filter(|encoded| !encoded.is_empty())
651                    }
652                    // Needs to be revealed first
653                    Some(LeafUpdate::Touched) => return true,
654                    None => self.trie.get_account_value(addr),
655                };
656
657                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
658
659                let (account, storage_root) = if let Some(account) = account.take() {
660                    // If account is Some(_) here it means it didn't have any storage updates
661                    // and we can fetch the storage root directly from the account trie.
662                    //
663                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
664                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
665
666                    (account, storage_root)
667                } else {
668                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
669                };
670
671                let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
672                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
673                num_promoted += 1;
674
675                false
676            });
677            span.record("promoted", num_promoted);
678            drop(span);
679
680            // Only exit when no new updates are processed.
681            //
682            // We need to keep iterating if any updates are being drained because that might
683            // indicate that more pending account updates can be promoted.
684            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
685                break
686            }
687        }
688
689        Ok(())
690    }
691
692    fn dispatch_pending_targets(&mut self) {
693        if self.pending_targets.is_empty() {
694            return;
695        }
696
697        let _span = debug_span!("dispatch_pending_targets").entered();
698        let (targets, chunking_length) = self.pending_targets.take();
699        dispatch_with_chunking(
700            targets,
701            chunking_length,
702            self.chunk_size,
703            self.max_targets_for_chunking,
704            self.proof_worker_handle.available_account_workers(),
705            self.proof_worker_handle.available_storage_workers(),
706            MultiProofTargetsV2::chunks,
707            |proof_targets| {
708                if let Err(e) =
709                    self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
710                        targets: proof_targets,
711                        proof_result_sender: ProofResultContext::new(
712                            self.proof_result_tx.clone(),
713                            HashedPostState::default(),
714                            Instant::now(),
715                        ),
716                    })
717                {
718                    error!("failed to dispatch account multiproof: {e:?}");
719                }
720            },
721        );
722    }
723}
724
725/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
726fn encode_account_leaf_value(
727    account: Option<Account>,
728    storage_root: B256,
729    account_rlp_buf: &mut Vec<u8>,
730) -> Vec<u8> {
731    if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
732        return Vec::new();
733    }
734
735    account_rlp_buf.clear();
736    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
737    account_rlp_buf.clone()
738}
739
740/// Pending proof targets queued for dispatch to proof workers, along with their count.
741#[derive(Default)]
742struct PendingTargets {
743    /// The proof targets.
744    targets: MultiProofTargetsV2,
745    /// Number of account + storage proof targets currently queued.
746    len: usize,
747}
748
749impl PendingTargets {
750    /// Returns the number of pending targets.
751    const fn len(&self) -> usize {
752        self.len
753    }
754
755    /// Returns `true` if there are no pending targets.
756    const fn is_empty(&self) -> bool {
757        self.len == 0
758    }
759
760    /// Takes the pending targets, replacing with empty defaults.
761    fn take(&mut self) -> (MultiProofTargetsV2, usize) {
762        (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
763    }
764
765    /// Adds a target to the account targets.
766    fn push_account_target(&mut self, target: ProofV2Target) {
767        self.targets.account_targets.push(target);
768        self.len += 1;
769    }
770
771    /// Extends storage targets for the given address.
772    fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
773        self.len += targets.len();
774        self.targets.storage_targets.entry(*address).or_default().extend(targets);
775    }
776}
777
778/// Message type for the sparse trie task.
779enum SparseTrieTaskMessage {
780    /// A hashed state update ready to be processed.
781    HashedState(HashedPostState),
782    /// Prefetch proof targets (passed through directly).
783    PrefetchProofs(MultiProofTargetsV2),
784    /// Signals that all state updates have been received.
785    FinishedStateUpdates,
786}
787
788/// Outcome of the state root computation, including the state root itself with
789/// the trie updates.
790#[derive(Debug, Clone)]
791pub struct StateRootComputeOutcome {
792    /// The state root.
793    pub state_root: B256,
794    /// The trie updates.
795    pub trie_updates: Arc<TrieUpdates>,
796    /// Debug recorders taken from the sparse tries, keyed by `None` for account trie
797    /// and `Some(address)` for storage tries.
798    #[cfg(feature = "trie-debug")]
799    pub debug_recorders: Vec<(Option<B256>, TrieDebugRecorder)>,
800}
801
802#[cfg(test)]
803mod tests {
804    use super::*;
805    use alloy_primitives::{keccak256, Address, B256, U256};
806    use reth_trie_sparse::ParallelSparseTrie;
807
808    #[test]
809    fn test_run_hashing_task_hashed_state_update_forwards() {
810        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
811        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
812
813        let address = keccak256(Address::random());
814        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
815        let value = U256::from(999);
816
817        let mut hashed_state = HashedPostState::default();
818        hashed_state.accounts.insert(
819            address,
820            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
821        );
822        let mut storage = reth_trie::HashedStorage::new(false);
823        storage.storage.insert(slot, value);
824        hashed_state.storages.insert(address, storage);
825
826        let expected_state = hashed_state.clone();
827
828        let handle = std::thread::spawn(move || {
829            SparseTrieCacheTask::<ParallelSparseTrie, ParallelSparseTrie>::run_hashing_task(
830                updates_rx,
831                hashed_state_tx,
832            );
833        });
834
835        updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
836        updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
837        drop(updates_tx);
838
839        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
840            panic!("expected HashedState message");
841        };
842
843        let account = received.accounts.get(&address).unwrap().unwrap();
844        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
845        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
846
847        let storage = received.storages.get(&address).unwrap();
848        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
849
850        let second = hashed_state_rx.recv().unwrap();
851        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
852
853        assert!(hashed_state_rx.recv().is_err());
854        handle.join().unwrap();
855    }
856
857    #[test]
858    fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
859        let mut account_rlp_buf = vec![0xAB];
860        let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
861
862        assert!(encoded.is_empty());
863        // Early return should not touch the caller's buffer.
864        assert_eq!(account_rlp_buf, vec![0xAB]);
865    }
866
867    #[test]
868    fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
869        let storage_root = B256::from([0x99; 32]);
870        let account = Some(Account {
871            nonce: 7,
872            balance: U256::from(42),
873            bytecode_hash: Some(B256::from([0xAA; 32])),
874        });
875        let mut account_rlp_buf = vec![0x00, 0x01];
876
877        let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
878        let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
879
880        assert_eq!(decoded.nonce, 7);
881        assert_eq!(decoded.balance, U256::from(42));
882        assert_eq!(decoded.storage_root, storage_root);
883        assert_eq!(account_rlp_buf, encoded);
884    }
885}