Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use std::sync::Arc;
4
5use crate::tree::{
6    multiproof::{
7        dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8        StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9    },
10    payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19    updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20    TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24    proof_task::{
25        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26    },
27    root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30    errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
31    RevealableSparseTrie, SparseStateTrie, SparseTrie,
32};
33use revm_primitives::{hash_map::Entry, B256Map};
34use tracing::{debug, debug_span, error, instrument, trace_span};
35
36/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
37const MAX_PENDING_UPDATES: usize = 100;
38
39/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
40pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
41    /// Sender for proof results.
42    proof_result_tx: CrossbeamSender<ProofResultMessage>,
43    /// Receiver for proof results directly from workers.
44    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
45    /// Receives updates from execution and prewarming.
46    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
47    /// `SparseStateTrie` used for computing the state root.
48    trie: SparseStateTrie<A, S>,
49    /// Handle to the proof worker pools (storage and account).
50    proof_worker_handle: ProofWorkerHandle,
51
52    /// The size of proof targets chunk to spawn in one calculation.
53    /// If None, chunking is disabled and all targets are processed in a single proof.
54    chunk_size: usize,
55    /// If this number is exceeded and chunking is enabled, then this will override whether or not
56    /// there are any active workers and force chunking across workers. This is to prevent tasks
57    /// which are very long from hitting a single worker.
58    max_targets_for_chunking: usize,
59
60    /// Account trie updates.
61    account_updates: B256Map<LeafUpdate>,
62    /// Storage trie updates. hashed address -> slot -> update.
63    storage_updates: B256Map<B256Map<LeafUpdate>>,
64
65    /// Account updates that are buffered but were not yet applied to the trie.
66    new_account_updates: B256Map<LeafUpdate>,
67    /// Storage updates that are buffered but were not yet applied to the trie.
68    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
69    /// Account updates that are blocked by storage root calculation or account reveal.
70    ///
71    /// Those are being moved into `account_updates` once storage roots
72    /// are revealed and/or calculated.
73    ///
74    /// Invariant: for each entry in `pending_account_updates` account must either be already
75    /// revealed in the trie or have an entry in `account_updates`.
76    ///
77    /// Values can be either of:
78    ///   - None: account had a storage update and is awaiting storage root calculation and/or
79    ///     account node reveal to complete.
80    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
81    ///     to complete.
82    pending_account_updates: B256Map<Option<Option<Account>>>,
83    /// Cache of account proof targets that were already fetched/requested from the proof workers.
84    /// account -> lowest `min_len` requested.
85    fetched_account_targets: B256Map<u8>,
86    /// Cache of storage proof targets that have already been fetched/requested from the proof
87    /// workers. account -> slot -> lowest `min_len` requested.
88    fetched_storage_targets: B256Map<B256Map<u8>>,
89    /// Reusable buffer for RLP encoding of accounts.
90    account_rlp_buf: Vec<u8>,
91    /// Whether the last state update has been received.
92    finished_state_updates: bool,
93    /// Accumulated account leaf update cache hits.
94    account_cache_hits: u64,
95    /// Accumulated account leaf update cache misses.
96    account_cache_misses: u64,
97    /// Accumulated storage leaf update cache hits.
98    storage_cache_hits: u64,
99    /// Accumulated storage leaf update cache misses.
100    storage_cache_misses: u64,
101    /// Pending proof targets queued for dispatch to proof workers.
102    pending_targets: PendingTargets,
103    /// Number of pending execution/prewarming updates received but not yet passed to
104    /// `update_leaves`.
105    pending_updates: usize,
106
107    /// Metrics for the sparse trie.
108    metrics: MultiProofTaskMetrics,
109}
110
111impl<A, S> SparseTrieCacheTask<A, S>
112where
113    A: SparseTrie + Default,
114    S: SparseTrie + Default + Clone,
115{
116    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
117    pub(super) fn new_with_trie(
118        executor: &Runtime,
119        updates: CrossbeamReceiver<StateRootMessage>,
120        proof_worker_handle: ProofWorkerHandle,
121        metrics: MultiProofTaskMetrics,
122        trie: SparseStateTrie<A, S>,
123        chunk_size: usize,
124    ) -> Self {
125        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
126        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
127
128        let parent_span = tracing::Span::current();
129        let hashing_metrics = metrics.clone();
130        executor.spawn_blocking_named("trie-hashing", move || {
131            let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
132            Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
133        });
134
135        Self {
136            proof_result_tx,
137            proof_result_rx,
138            updates: hashed_state_rx,
139            proof_worker_handle,
140            trie,
141            chunk_size,
142            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
143            account_updates: Default::default(),
144            storage_updates: Default::default(),
145            new_account_updates: Default::default(),
146            new_storage_updates: Default::default(),
147            pending_account_updates: Default::default(),
148            fetched_account_targets: Default::default(),
149            fetched_storage_targets: Default::default(),
150            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
151            finished_state_updates: Default::default(),
152            account_cache_hits: 0,
153            account_cache_misses: 0,
154            storage_cache_hits: 0,
155            storage_cache_misses: 0,
156            pending_targets: Default::default(),
157            pending_updates: Default::default(),
158            metrics,
159        }
160    }
161
162    /// Runs the hashing task that drains updates from the channel and converts them to
163    /// `HashedPostState` in parallel.
164    fn run_hashing_task(
165        updates: CrossbeamReceiver<StateRootMessage>,
166        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
167        metrics: MultiProofTaskMetrics,
168    ) {
169        let mut total_idle_time = std::time::Duration::ZERO;
170        let mut idle_start = Instant::now();
171
172        while let Ok(message) = updates.recv() {
173            total_idle_time += idle_start.elapsed();
174
175            let msg = match message {
176                StateRootMessage::PrefetchProofs(targets) => {
177                    SparseTrieTaskMessage::PrefetchProofs(targets)
178                }
179                StateRootMessage::StateUpdate(_, state) => {
180                    let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
181                    let hashed = evm_state_to_hashed_post_state(state);
182                    SparseTrieTaskMessage::HashedState(hashed)
183                }
184                StateRootMessage::FinishedStateUpdates => {
185                    SparseTrieTaskMessage::FinishedStateUpdates
186                }
187                StateRootMessage::BlockAccessList(_) => {
188                    idle_start = Instant::now();
189                    continue;
190                }
191                StateRootMessage::HashedStateUpdate(state) => {
192                    SparseTrieTaskMessage::HashedState(state)
193                }
194            };
195            if hashed_state_tx.send(msg).is_err() {
196                break;
197            }
198
199            idle_start = Instant::now();
200        }
201
202        metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
203    }
204
205    /// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
206    ///
207    /// Should be called after the state root result has been sent.
208    ///
209    /// When `disable_pruning` is true, the trie is preserved without any node pruning,
210    /// storage trie eviction, or capacity shrinking, keeping the full cache intact for
211    /// benchmarking purposes.
212    pub(super) fn into_trie_for_reuse(
213        self,
214        max_hot_slots: usize,
215        max_hot_accounts: usize,
216        max_nodes_capacity: usize,
217        max_values_capacity: usize,
218        disable_pruning: bool,
219        updates: &TrieUpdates,
220    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
221        let Self { mut trie, .. } = self;
222        trie.commit_updates(updates);
223        if !disable_pruning {
224            trie.prune(max_hot_slots, max_hot_accounts);
225            trie.shrink_to(max_nodes_capacity, max_values_capacity);
226        }
227        let deferred = trie.take_deferred_drops();
228        (trie, deferred)
229    }
230
231    /// Clears and shrinks the trie, discarding all state.
232    ///
233    /// Use this when the payload was invalid or cancelled - we don't want to preserve
234    /// potentially invalid trie state, but we keep the allocations for reuse.
235    pub(super) fn into_cleared_trie(
236        self,
237        max_nodes_capacity: usize,
238        max_values_capacity: usize,
239    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
240        let Self { mut trie, .. } = self;
241        trie.clear();
242        trie.shrink_to(max_nodes_capacity, max_values_capacity);
243        let deferred = trie.take_deferred_drops();
244        (trie, deferred)
245    }
246
247    /// Runs the sparse trie task to completion.
248    ///
249    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
250    /// to the trie and schedules proof fetching when needed.
251    ///
252    /// This concludes once the last state update has been received and processed.
253    #[instrument(
254        name = "SparseTrieCacheTask::run",
255        level = "debug",
256        target = "engine::tree::payload_processor::sparse_trie",
257        skip_all
258    )]
259    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
260        let now = Instant::now();
261
262        let mut total_idle_time = std::time::Duration::ZERO;
263        let mut idle_start = Instant::now();
264
265        loop {
266            let mut t = Instant::now();
267            crossbeam_channel::select_biased! {
268                recv(self.updates) -> message => {
269                    let wake = Instant::now();
270
271                    let update = match message {
272                        Ok(m) => m,
273                        Err(_) => {
274                            return Err(ParallelStateRootError::Other(
275                                "updates channel disconnected before state root calculation".to_string(),
276                            ))
277                        }
278                    };
279
280                    total_idle_time += wake.duration_since(idle_start);
281                    self.metrics
282                        .sparse_trie_channel_wait_duration_histogram
283                        .record(wake.duration_since(t));
284
285                    self.on_message(update);
286                    self.pending_updates += 1;
287                }
288                recv(self.proof_result_rx) -> message => {
289                    let phase_end = Instant::now();
290                    total_idle_time += phase_end.duration_since(idle_start);
291                    self.metrics
292                        .sparse_trie_channel_wait_duration_histogram
293                        .record(phase_end.duration_since(t));
294                    t = phase_end;
295
296                    let Ok(result) = message else {
297                        unreachable!("we own the sender half")
298                    };
299
300                    let mut result = result.result?;
301                    while let Ok(next) = self.proof_result_rx.try_recv() {
302                        let res = next.result?;
303                        result.extend(res);
304                    }
305
306                    let phase_end = Instant::now();
307                    self.metrics
308                        .sparse_trie_proof_coalesce_duration_histogram
309                        .record(phase_end.duration_since(t));
310                    t = phase_end;
311
312                    self.on_proof_result(result)?;
313                    self.metrics
314                        .sparse_trie_reveal_multiproof_duration_histogram
315                        .record(t.elapsed());
316                },
317            }
318
319            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
320                // If we don't have any pending messages, we can spend some time on computing
321                // storage roots and promoting account updates.
322                self.dispatch_pending_targets();
323                t = Instant::now();
324                self.process_new_updates()?;
325                self.promote_pending_account_updates()?;
326                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
327
328                if self.finished_state_updates &&
329                    self.account_updates.is_empty() &&
330                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
331                {
332                    break;
333                }
334
335                self.dispatch_pending_targets();
336
337                // If there's still no pending updates spend some time pre-computing the account
338                // trie upper hashes
339                if self.proof_result_rx.is_empty() {
340                    self.trie.calculate_subtries();
341                }
342            } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
343                // If we don't have any pending updates OR we've accumulated a lot already, apply
344                // them to the trie,
345                t = Instant::now();
346                self.process_new_updates()?;
347                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
348                self.dispatch_pending_targets();
349            } else if self.pending_targets.len() > self.chunk_size {
350                // Make sure to dispatch targets if we've accumulated a lot of them.
351                self.dispatch_pending_targets();
352            }
353
354            idle_start = Instant::now();
355        }
356
357        self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
358
359        debug!(target: "engine::root", "All proofs processed, ending calculation");
360
361        let start = Instant::now();
362        let (state_root, trie_updates) =
363            self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
364                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
365            })?;
366
367        #[cfg(feature = "trie-debug")]
368        let debug_recorders = self.trie.take_debug_recorders();
369
370        let end = Instant::now();
371        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
372        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
373
374        self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
375        self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
376        self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
377        self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
378        self.account_cache_hits = 0;
379        self.account_cache_misses = 0;
380        self.storage_cache_hits = 0;
381        self.storage_cache_misses = 0;
382
383        Ok(StateRootComputeOutcome {
384            state_root,
385            trie_updates: Arc::new(trie_updates),
386            #[cfg(feature = "trie-debug")]
387            debug_recorders,
388        })
389    }
390
391    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
392    fn on_message(&mut self, message: SparseTrieTaskMessage) {
393        match message {
394            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
395            SparseTrieTaskMessage::HashedState(hashed_state) => {
396                self.on_hashed_state_update(hashed_state)
397            }
398            SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
399        }
400    }
401
402    #[instrument(
403        level = "trace",
404        target = "engine::tree::payload_processor::sparse_trie",
405        skip_all
406    )]
407    fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
408        for target in targets.account_targets {
409            // Only touch accounts that are not yet present in the updates set.
410            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
411        }
412
413        for (address, slots) in targets.storage_targets {
414            if !slots.is_empty() {
415                // Look up outer map once per address instead of once per slot.
416                let new_updates = self.new_storage_updates.entry(address).or_default();
417                for slot in slots {
418                    // Only touch storages that are not yet present in the updates set.
419                    new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
420                }
421            }
422
423            // Touch corresponding account leaf to make sure its revealed in accounts trie for
424            // storage root update.
425            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
426        }
427    }
428
429    /// Processes a hashed state update and encodes all state changes as trie updates.
430    #[instrument(
431        level = "trace",
432        target = "engine::tree::payload_processor::sparse_trie",
433        skip_all
434    )]
435    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
436        for (address, storage) in hashed_state_update.storages {
437            if !storage.storage.is_empty() {
438                // Look up outer maps once per address instead of once per slot.
439                let new_updates = self.new_storage_updates.entry(address).or_default();
440                let mut existing_updates = self.storage_updates.get_mut(&address);
441
442                for (slot, value) in storage.storage {
443                    self.trie.record_slot_touch(address, slot);
444
445                    let encoded = if value.is_zero() {
446                        Vec::new()
447                    } else {
448                        alloy_rlp::encode_fixed_size(&value).to_vec()
449                    };
450                    new_updates.insert(slot, LeafUpdate::Changed(encoded));
451
452                    // Remove an existing storage update if it exists.
453                    if let Some(ref mut existing) = existing_updates {
454                        existing.remove(&slot);
455                    }
456                }
457            }
458
459            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
460            // trie for storage root update.
461            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
462
463            // Make sure account is tracked in `pending_account_updates` so that once storage root
464            // is computed, it will be updated in the accounts trie.
465            self.pending_account_updates.entry(address).or_insert(None);
466        }
467
468        for (address, account) in hashed_state_update.accounts {
469            self.trie.record_account_touch(address);
470
471            // Track account as touched.
472            //
473            // This might overwrite an existing update, which is fine, because storage root from it
474            // is already tracked in the trie and can be easily fetched again.
475            self.new_account_updates.insert(address, LeafUpdate::Touched);
476
477            // Track account in `pending_account_updates` so that once storage root is computed,
478            // it will be updated in the accounts trie.
479            self.pending_account_updates.insert(address, Some(account));
480        }
481    }
482
483    fn on_proof_result(
484        &mut self,
485        result: DecodedMultiProofV2,
486    ) -> Result<(), ParallelStateRootError> {
487        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
488            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
489        })
490    }
491
492    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
493        if self.pending_updates == 0 {
494            return Ok(());
495        }
496
497        let _span = debug_span!("process_new_updates").entered();
498        self.pending_updates = 0;
499
500        // Firstly apply all new storage and account updates to the tries.
501        self.process_leaf_updates(true)?;
502
503        for (address, mut new) in self.new_storage_updates.drain() {
504            match self.storage_updates.entry(address) {
505                Entry::Vacant(entry) => {
506                    entry.insert(new); // insert the whole map at once, no per-slot loop
507                }
508                Entry::Occupied(mut entry) => {
509                    let updates = entry.get_mut();
510                    for (slot, new) in new.drain() {
511                        match updates.entry(slot) {
512                            Entry::Occupied(mut slot_entry) => {
513                                if new.is_changed() {
514                                    slot_entry.insert(new);
515                                }
516                            }
517                            Entry::Vacant(slot_entry) => {
518                                slot_entry.insert(new);
519                            }
520                        }
521                    }
522                }
523            }
524        }
525
526        for (address, new) in self.new_account_updates.drain() {
527            match self.account_updates.entry(address) {
528                Entry::Occupied(mut entry) => {
529                    if new.is_changed() {
530                        entry.insert(new);
531                    }
532                }
533                Entry::Vacant(entry) => {
534                    entry.insert(new);
535                }
536            }
537        }
538
539        Ok(())
540    }
541
542    /// Applies all account and storage leaf updates to corresponding tries and collects any new
543    /// multiproof targets.
544    #[instrument(
545        level = "trace",
546        target = "engine::tree::payload_processor::sparse_trie",
547        skip_all
548    )]
549    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
550        let storage_updates =
551            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
552
553        // Process all storage updates, skipping tries with no pending updates.
554        let span = trace_span!("process_storage_leaf_updates").entered();
555        for (address, updates) in storage_updates {
556            if updates.is_empty() {
557                continue;
558            }
559            let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
560
561            let trie = self.trie.get_or_create_storage_trie_mut(*address);
562            let fetched = self.fetched_storage_targets.entry(*address).or_default();
563            let mut targets = Vec::new();
564
565            let updates_len_before = updates.len();
566            trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
567                Entry::Occupied(mut entry) => {
568                    if min_len < *entry.get() {
569                        entry.insert(min_len);
570                        targets.push(ProofV2Target::new(path).with_min_len(min_len));
571                    }
572                }
573                Entry::Vacant(entry) => {
574                    entry.insert(min_len);
575                    targets.push(ProofV2Target::new(path).with_min_len(min_len));
576                }
577            })?;
578            let updates_len_after = updates.len();
579            self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
580            self.storage_cache_misses += updates_len_after as u64;
581
582            if !targets.is_empty() {
583                self.pending_targets.extend_storage_targets(address, targets);
584            }
585        }
586
587        drop(span);
588
589        // Process account trie updates and fill the account targets.
590        self.process_account_leaf_updates(new)?;
591
592        Ok(())
593    }
594
595    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
596    ///
597    /// Returns whether any updates were drained (applied to the trie).
598    #[instrument(
599        level = "trace",
600        target = "engine::tree::payload_processor::sparse_trie",
601        skip_all
602    )]
603    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
604        let account_updates =
605            if new { &mut self.new_account_updates } else { &mut self.account_updates };
606
607        let updates_len_before = account_updates.len();
608
609        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
610            match self.fetched_account_targets.entry(target) {
611                Entry::Occupied(mut entry) => {
612                    if min_len < *entry.get() {
613                        entry.insert(min_len);
614                        self.pending_targets
615                            .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
616                    }
617                }
618                Entry::Vacant(entry) => {
619                    entry.insert(min_len);
620                    self.pending_targets
621                        .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
622                }
623            }
624        })?;
625
626        let updates_len_after = account_updates.len();
627        self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
628        self.account_cache_misses += updates_len_after as u64;
629
630        Ok(updates_len_after < updates_len_before)
631    }
632
633    /// Computes storage roots for accounts whose storage updates are fully drained.
634    ///
635    /// For each storage trie T that:
636    /// 1. was modified in the current block,
637    /// 2. all the storage updates are fully drained,
638    /// 3. but the storage root hasn't been updated yet,
639    ///
640    /// we trigger state root computation on a rayon pool.
641    fn compute_drained_storage_roots(&mut self) {
642        let addresses_to_compute_roots: Vec<_> = self
643            .storage_updates
644            .iter()
645            .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
646            .collect();
647
648        struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
649        // SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
650        // documented at the use site below.
651        unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
652
653        let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
654            Vec::with_capacity(addresses_to_compute_roots.len());
655        for address in addresses_to_compute_roots {
656            if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
657                !trie.is_root_cached()
658            {
659                tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
660            }
661        }
662
663        if tries_to_compute_roots.is_empty() {
664            return;
665        }
666
667        let parent_span =
668            debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
669        tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
670            let span = if tracing::enabled!(tracing::Level::TRACE) {
671                debug_span!(
672                    target: "engine::tree::payload_processor::sparse_trie",
673                    parent: &parent_span,
674                    "storage_root",
675                    ?address
676                )
677            } else {
678                debug_span!(
679                    target: "engine::tree::payload_processor::sparse_trie",
680                    parent: &parent_span,
681                    "storage_root",
682                )
683            };
684            let _enter = span.entered();
685            // SAFETY:
686            // - pointers are created from `storage_tries_mut().get_mut(address)` above;
687            // - `addresses_to_compute_roots` comes from map iteration, so addresses are unique;
688            // - we do not insert/remove entries between pointer collection and use, so pointers
689            //   stay valid and map reallocation cannot occur;
690            // - each pointer is consumed by at most one rayon task, so no aliasing mutable access.
691            unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
692        });
693    }
694
695    /// Iterates through all storage tries for which all updates were processed, computes their
696    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
697    /// for accounts trie.
698    #[instrument(
699        level = "trace",
700        target = "engine::tree::payload_processor::sparse_trie",
701        skip_all
702    )]
703    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
704        self.process_leaf_updates(false)?;
705
706        if self.pending_account_updates.is_empty() {
707            return Ok(());
708        }
709
710        self.compute_drained_storage_roots();
711
712        loop {
713            let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
714            // Now handle pending account updates that can be upgraded to a proper update.
715            let account_rlp_buf = &mut self.account_rlp_buf;
716            let mut num_promoted = 0;
717            self.pending_account_updates.retain(|addr, account| {
718                if let Some(updates) = self.storage_updates.get(addr) {
719                    if !updates.is_empty() {
720                        // If account has pending storage updates, it is still pending.
721                        return true;
722                    } else if let Some(account) = account.take() {
723                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
724                        let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
725                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
726                        num_promoted += 1;
727                        return false;
728                    }
729                }
730
731                // Get the current account state either from the trie or from latest account update.
732                let trie_account = match self.account_updates.get(addr) {
733                    Some(LeafUpdate::Changed(encoded)) => {
734                        Some(encoded).filter(|encoded| !encoded.is_empty())
735                    }
736                    // Needs to be revealed first
737                    Some(LeafUpdate::Touched) => return true,
738                    None => self.trie.get_account_value(addr),
739                };
740
741                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
742
743                let (account, storage_root) = if let Some(account) = account.take() {
744                    // If account is Some(_) here it means it didn't have any storage updates
745                    // and we can fetch the storage root directly from the account trie.
746                    //
747                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
748                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
749
750                    (account, storage_root)
751                } else {
752                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
753                };
754
755                let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
756                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
757                num_promoted += 1;
758
759                false
760            });
761            span.record("promoted", num_promoted);
762            drop(span);
763
764            // Only exit when no new updates are processed.
765            //
766            // We need to keep iterating if any updates are being drained because that might
767            // indicate that more pending account updates can be promoted.
768            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
769                break
770            }
771        }
772
773        Ok(())
774    }
775
776    fn dispatch_pending_targets(&mut self) {
777        if self.pending_targets.is_empty() {
778            return;
779        }
780
781        let _span = trace_span!("dispatch_pending_targets").entered();
782        let (targets, chunking_length) = self.pending_targets.take();
783        dispatch_with_chunking(
784            targets,
785            chunking_length,
786            self.chunk_size,
787            self.max_targets_for_chunking,
788            self.proof_worker_handle.has_multiple_idle_account_workers(),
789            self.proof_worker_handle.has_multiple_idle_storage_workers(),
790            MultiProofTargetsV2::chunks,
791            |proof_targets| {
792                if let Err(e) =
793                    self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
794                        targets: proof_targets,
795                        proof_result_sender: ProofResultContext::new(
796                            self.proof_result_tx.clone(),
797                            HashedPostState::default(),
798                            Instant::now(),
799                        ),
800                    })
801                {
802                    error!("failed to dispatch account multiproof: {e:?}");
803                }
804            },
805        );
806    }
807}
808
809/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
810fn encode_account_leaf_value(
811    account: Option<Account>,
812    storage_root: B256,
813    account_rlp_buf: &mut Vec<u8>,
814) -> Vec<u8> {
815    if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
816        return Vec::new();
817    }
818
819    account_rlp_buf.clear();
820    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
821    account_rlp_buf.clone()
822}
823
824/// Pending proof targets queued for dispatch to proof workers, along with their count.
825#[derive(Default)]
826struct PendingTargets {
827    /// The proof targets.
828    targets: MultiProofTargetsV2,
829    /// Number of account + storage proof targets currently queued.
830    len: usize,
831}
832
833impl PendingTargets {
834    /// Returns the number of pending targets.
835    const fn len(&self) -> usize {
836        self.len
837    }
838
839    /// Returns `true` if there are no pending targets.
840    const fn is_empty(&self) -> bool {
841        self.len == 0
842    }
843
844    /// Takes the pending targets, replacing with empty defaults.
845    fn take(&mut self) -> (MultiProofTargetsV2, usize) {
846        (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
847    }
848
849    /// Adds a target to the account targets.
850    fn push_account_target(&mut self, target: ProofV2Target) {
851        self.targets.account_targets.push(target);
852        self.len += 1;
853    }
854
855    /// Extends storage targets for the given address.
856    fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
857        self.len += targets.len();
858        self.targets.storage_targets.entry(*address).or_default().extend(targets);
859    }
860}
861
862/// Message type for the sparse trie task.
863enum SparseTrieTaskMessage {
864    /// A hashed state update ready to be processed.
865    HashedState(HashedPostState),
866    /// Prefetch proof targets (passed through directly).
867    PrefetchProofs(MultiProofTargetsV2),
868    /// Signals that all state updates have been received.
869    FinishedStateUpdates,
870}
871
872#[cfg(test)]
873mod tests {
874    use super::*;
875    use alloy_primitives::{keccak256, Address, B256, U256};
876    use reth_trie_sparse::ArenaParallelSparseTrie;
877
878    #[test]
879    fn test_run_hashing_task_hashed_state_update_forwards() {
880        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
881        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
882
883        let address = keccak256(Address::random());
884        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
885        let value = U256::from(999);
886
887        let mut hashed_state = HashedPostState::default();
888        hashed_state.accounts.insert(
889            address,
890            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
891        );
892        let mut storage = reth_trie::HashedStorage::new(false);
893        storage.storage.insert(slot, value);
894        hashed_state.storages.insert(address, storage);
895
896        let expected_state = hashed_state.clone();
897
898        let handle = std::thread::spawn(move || {
899            SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
900                updates_rx,
901                hashed_state_tx,
902                MultiProofTaskMetrics::default(),
903            );
904        });
905
906        updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
907        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
908        drop(updates_tx);
909
910        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
911            panic!("expected HashedState message");
912        };
913
914        let account = received.accounts.get(&address).unwrap().unwrap();
915        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
916        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
917
918        let storage = received.storages.get(&address).unwrap();
919        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
920
921        let second = hashed_state_rx.recv().unwrap();
922        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
923
924        assert!(hashed_state_rx.recv().is_err());
925        handle.join().unwrap();
926    }
927
928    #[test]
929    fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
930        let mut account_rlp_buf = vec![0xAB];
931        let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
932
933        assert!(encoded.is_empty());
934        // Early return should not touch the caller's buffer.
935        assert_eq!(account_rlp_buf, vec![0xAB]);
936    }
937
938    #[test]
939    fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
940        let storage_root = B256::from([0x99; 32]);
941        let account = Some(Account {
942            nonce: 7,
943            balance: U256::from(42),
944            bytecode_hash: Some(B256::from([0xAA; 32])),
945        });
946        let mut account_rlp_buf = vec![0x00, 0x01];
947
948        let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
949        let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
950
951        assert_eq!(decoded.nonce, 7);
952        assert_eq!(decoded.balance, U256::from(42));
953        assert_eq!(decoded.storage_root, storage_root);
954        assert_eq!(account_rlp_buf, encoded);
955    }
956}