Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use std::sync::Arc;
4
5use crate::tree::{
6    multiproof::{
7        dispatch_with_chunking, evm_state_to_hashed_post_state, MultiProofMessage,
8        DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9    },
10    payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19    updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20    TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24    proof_task::{
25        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26    },
27    root::ParallelStateRootError,
28};
29#[cfg(feature = "trie-debug")]
30use reth_trie_sparse::debug_recorder::TrieDebugRecorder;
31use reth_trie_sparse::{
32    errors::SparseTrieResult, ConfigurableSparseTrie, DeferredDrops, LeafUpdate,
33    RevealableSparseTrie, SparseStateTrie, SparseTrie,
34};
35use revm_primitives::{hash_map::Entry, B256Map};
36use tracing::{debug, debug_span, error, instrument, trace_span};
37
38/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
39const MAX_PENDING_UPDATES: usize = 100;
40
41/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
42pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
43    /// Sender for proof results.
44    proof_result_tx: CrossbeamSender<ProofResultMessage>,
45    /// Receiver for proof results directly from workers.
46    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
47    /// Receives updates from execution and prewarming.
48    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
49    /// `SparseStateTrie` used for computing the state root.
50    trie: SparseStateTrie<A, S>,
51    /// Handle to the proof worker pools (storage and account).
52    proof_worker_handle: ProofWorkerHandle,
53
54    /// The size of proof targets chunk to spawn in one calculation.
55    /// If None, chunking is disabled and all targets are processed in a single proof.
56    chunk_size: usize,
57    /// If this number is exceeded and chunking is enabled, then this will override whether or not
58    /// there are any active workers and force chunking across workers. This is to prevent tasks
59    /// which are very long from hitting a single worker.
60    max_targets_for_chunking: usize,
61
62    /// Account trie updates.
63    account_updates: B256Map<LeafUpdate>,
64    /// Storage trie updates. hashed address -> slot -> update.
65    storage_updates: B256Map<B256Map<LeafUpdate>>,
66
67    /// Account updates that are buffered but were not yet applied to the trie.
68    new_account_updates: B256Map<LeafUpdate>,
69    /// Storage updates that are buffered but were not yet applied to the trie.
70    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
71    /// Account updates that are blocked by storage root calculation or account reveal.
72    ///
73    /// Those are being moved into `account_updates` once storage roots
74    /// are revealed and/or calculated.
75    ///
76    /// Invariant: for each entry in `pending_account_updates` account must either be already
77    /// revealed in the trie or have an entry in `account_updates`.
78    ///
79    /// Values can be either of:
80    ///   - None: account had a storage update and is awaiting storage root calculation and/or
81    ///     account node reveal to complete.
82    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
83    ///     to complete.
84    pending_account_updates: B256Map<Option<Option<Account>>>,
85    /// Cache of account proof targets that were already fetched/requested from the proof workers.
86    /// account -> lowest `min_len` requested.
87    fetched_account_targets: B256Map<u8>,
88    /// Cache of storage proof targets that have already been fetched/requested from the proof
89    /// workers. account -> slot -> lowest `min_len` requested.
90    fetched_storage_targets: B256Map<B256Map<u8>>,
91    /// Reusable buffer for RLP encoding of accounts.
92    account_rlp_buf: Vec<u8>,
93    /// Whether the last state update has been received.
94    finished_state_updates: bool,
95    /// Accumulated account leaf update cache hits.
96    account_cache_hits: u64,
97    /// Accumulated account leaf update cache misses.
98    account_cache_misses: u64,
99    /// Accumulated storage leaf update cache hits.
100    storage_cache_hits: u64,
101    /// Accumulated storage leaf update cache misses.
102    storage_cache_misses: u64,
103    /// Pending proof targets queued for dispatch to proof workers.
104    pending_targets: PendingTargets,
105    /// Number of pending execution/prewarming updates received but not yet passed to
106    /// `update_leaves`.
107    pending_updates: usize,
108
109    /// Metrics for the sparse trie.
110    metrics: MultiProofTaskMetrics,
111}
112
113impl<A, S> SparseTrieCacheTask<A, S>
114where
115    A: SparseTrie + Default,
116    S: SparseTrie + Default + Clone,
117{
118    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
119    pub(super) fn new_with_trie(
120        executor: &Runtime,
121        updates: CrossbeamReceiver<MultiProofMessage>,
122        proof_worker_handle: ProofWorkerHandle,
123        metrics: MultiProofTaskMetrics,
124        trie: SparseStateTrie<A, S>,
125        chunk_size: usize,
126    ) -> Self {
127        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
128        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
129
130        let parent_span = tracing::Span::current();
131        let hashing_metrics = metrics.clone();
132        executor.spawn_blocking_named("trie-hashing", move || {
133            let _span = debug_span!(parent: parent_span, "run_hashing_task").entered();
134            Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
135        });
136
137        Self {
138            proof_result_tx,
139            proof_result_rx,
140            updates: hashed_state_rx,
141            proof_worker_handle,
142            trie,
143            chunk_size,
144            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
145            account_updates: Default::default(),
146            storage_updates: Default::default(),
147            new_account_updates: Default::default(),
148            new_storage_updates: Default::default(),
149            pending_account_updates: Default::default(),
150            fetched_account_targets: Default::default(),
151            fetched_storage_targets: Default::default(),
152            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
153            finished_state_updates: Default::default(),
154            account_cache_hits: 0,
155            account_cache_misses: 0,
156            storage_cache_hits: 0,
157            storage_cache_misses: 0,
158            pending_targets: Default::default(),
159            pending_updates: Default::default(),
160            metrics,
161        }
162    }
163
164    /// Runs the hashing task that drains updates from the channel and converts them to
165    /// `HashedPostState` in parallel.
166    fn run_hashing_task(
167        updates: CrossbeamReceiver<MultiProofMessage>,
168        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
169        metrics: MultiProofTaskMetrics,
170    ) {
171        let mut total_idle_time = std::time::Duration::ZERO;
172        let mut idle_start = Instant::now();
173
174        while let Ok(message) = updates.recv() {
175            total_idle_time += idle_start.elapsed();
176
177            let msg = match message {
178                MultiProofMessage::PrefetchProofs(targets) => {
179                    SparseTrieTaskMessage::PrefetchProofs(targets)
180                }
181                MultiProofMessage::StateUpdate(_, state) => {
182                    let _span = debug_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
183                    let hashed = evm_state_to_hashed_post_state(state);
184                    SparseTrieTaskMessage::HashedState(hashed)
185                }
186                MultiProofMessage::FinishedStateUpdates => {
187                    SparseTrieTaskMessage::FinishedStateUpdates
188                }
189                MultiProofMessage::BlockAccessList(_) => {
190                    idle_start = Instant::now();
191                    continue;
192                }
193                MultiProofMessage::HashedStateUpdate(state) => {
194                    SparseTrieTaskMessage::HashedState(state)
195                }
196            };
197            if hashed_state_tx.send(msg).is_err() {
198                break;
199            }
200
201            idle_start = Instant::now();
202        }
203
204        metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
205    }
206
207    /// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
208    ///
209    /// Should be called after the state root result has been sent.
210    ///
211    /// When `disable_pruning` is true, the trie is preserved without any node pruning,
212    /// storage trie eviction, or capacity shrinking, keeping the full cache intact for
213    /// benchmarking purposes.
214    pub(super) fn into_trie_for_reuse(
215        self,
216        max_hot_slots: usize,
217        max_hot_accounts: usize,
218        max_nodes_capacity: usize,
219        max_values_capacity: usize,
220        disable_pruning: bool,
221        updates: &TrieUpdates,
222    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
223        let Self { mut trie, .. } = self;
224        trie.commit_updates(updates);
225        if !disable_pruning {
226            trie.prune(max_hot_slots, max_hot_accounts);
227            trie.shrink_to(max_nodes_capacity, max_values_capacity);
228        }
229        let deferred = trie.take_deferred_drops();
230        (trie, deferred)
231    }
232
233    /// Clears and shrinks the trie, discarding all state.
234    ///
235    /// Use this when the payload was invalid or cancelled - we don't want to preserve
236    /// potentially invalid trie state, but we keep the allocations for reuse.
237    pub(super) fn into_cleared_trie(
238        self,
239        max_nodes_capacity: usize,
240        max_values_capacity: usize,
241    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
242        let Self { mut trie, .. } = self;
243        trie.clear();
244        trie.shrink_to(max_nodes_capacity, max_values_capacity);
245        let deferred = trie.take_deferred_drops();
246        (trie, deferred)
247    }
248
249    /// Runs the sparse trie task to completion.
250    ///
251    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
252    /// to the trie and schedules proof fetching when needed.
253    ///
254    /// This concludes once the last state update has been received and processed.
255    #[instrument(
256        name = "SparseTrieCacheTask::run",
257        level = "debug",
258        target = "engine::tree::payload_processor::sparse_trie",
259        skip_all
260    )]
261    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
262        let now = Instant::now();
263
264        let mut total_idle_time = std::time::Duration::ZERO;
265        let mut idle_start = Instant::now();
266
267        loop {
268            let mut t = Instant::now();
269            crossbeam_channel::select_biased! {
270                recv(self.updates) -> message => {
271                    let wake = Instant::now();
272
273                    let update = match message {
274                        Ok(m) => m,
275                        Err(_) => {
276                            return Err(ParallelStateRootError::Other(
277                                "updates channel disconnected before state root calculation".to_string(),
278                            ))
279                        }
280                    };
281
282                    total_idle_time += wake.duration_since(idle_start);
283                    self.metrics
284                        .sparse_trie_channel_wait_duration_histogram
285                        .record(wake.duration_since(t));
286
287                    self.on_message(update);
288                    self.pending_updates += 1;
289                }
290                recv(self.proof_result_rx) -> message => {
291                    let phase_end = Instant::now();
292                    total_idle_time += phase_end.duration_since(idle_start);
293                    self.metrics
294                        .sparse_trie_channel_wait_duration_histogram
295                        .record(phase_end.duration_since(t));
296                    t = phase_end;
297
298                    let Ok(result) = message else {
299                        unreachable!("we own the sender half")
300                    };
301
302                    let mut result = result.result?;
303                    while let Ok(next) = self.proof_result_rx.try_recv() {
304                        let res = next.result?;
305                        result.extend(res);
306                    }
307
308                    let phase_end = Instant::now();
309                    self.metrics
310                        .sparse_trie_proof_coalesce_duration_histogram
311                        .record(phase_end.duration_since(t));
312                    t = phase_end;
313
314                    self.on_proof_result(result)?;
315                    self.metrics
316                        .sparse_trie_reveal_multiproof_duration_histogram
317                        .record(t.elapsed());
318                },
319            }
320
321            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
322                // If we don't have any pending messages, we can spend some time on computing
323                // storage roots and promoting account updates.
324                self.dispatch_pending_targets();
325                t = Instant::now();
326                self.process_new_updates()?;
327                self.promote_pending_account_updates()?;
328                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
329
330                if self.finished_state_updates &&
331                    self.account_updates.is_empty() &&
332                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
333                {
334                    break;
335                }
336
337                self.dispatch_pending_targets();
338
339                // If there's still no pending updates spend some time pre-computing the account
340                // trie upper hashes
341                if self.proof_result_rx.is_empty() {
342                    self.trie.calculate_subtries();
343                }
344            } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
345                // If we don't have any pending updates OR we've accumulated a lot already, apply
346                // them to the trie,
347                t = Instant::now();
348                self.process_new_updates()?;
349                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
350                self.dispatch_pending_targets();
351            } else if self.pending_targets.len() > self.chunk_size {
352                // Make sure to dispatch targets if we've accumulated a lot of them.
353                self.dispatch_pending_targets();
354            }
355
356            idle_start = Instant::now();
357        }
358
359        self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
360
361        debug!(target: "engine::root", "All proofs processed, ending calculation");
362
363        let start = Instant::now();
364        let (state_root, trie_updates) =
365            self.trie.root_with_updates(&self.proof_worker_handle).map_err(|e| {
366                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
367            })?;
368
369        #[cfg(feature = "trie-debug")]
370        let debug_recorders = self.trie.take_debug_recorders();
371
372        let end = Instant::now();
373        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
374        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
375
376        self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
377        self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
378        self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
379        self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
380        self.account_cache_hits = 0;
381        self.account_cache_misses = 0;
382        self.storage_cache_hits = 0;
383        self.storage_cache_misses = 0;
384
385        Ok(StateRootComputeOutcome {
386            state_root,
387            trie_updates: Arc::new(trie_updates),
388            #[cfg(feature = "trie-debug")]
389            debug_recorders,
390        })
391    }
392
393    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
394    fn on_message(&mut self, message: SparseTrieTaskMessage) {
395        match message {
396            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
397            SparseTrieTaskMessage::HashedState(hashed_state) => {
398                self.on_hashed_state_update(hashed_state)
399            }
400            SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
401        }
402    }
403
404    #[instrument(
405        level = "trace",
406        target = "engine::tree::payload_processor::sparse_trie",
407        skip_all
408    )]
409    fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
410        for target in targets.account_targets {
411            // Only touch accounts that are not yet present in the updates set.
412            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
413        }
414
415        for (address, slots) in targets.storage_targets {
416            if !slots.is_empty() {
417                // Look up outer map once per address instead of once per slot.
418                let new_updates = self.new_storage_updates.entry(address).or_default();
419                for slot in slots {
420                    // Only touch storages that are not yet present in the updates set.
421                    new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
422                }
423            }
424
425            // Touch corresponding account leaf to make sure its revealed in accounts trie for
426            // storage root update.
427            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
428        }
429    }
430
431    /// Processes a hashed state update and encodes all state changes as trie updates.
432    #[instrument(
433        level = "trace",
434        target = "engine::tree::payload_processor::sparse_trie",
435        skip_all
436    )]
437    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
438        for (address, storage) in hashed_state_update.storages {
439            if !storage.storage.is_empty() {
440                // Look up outer maps once per address instead of once per slot.
441                let new_updates = self.new_storage_updates.entry(address).or_default();
442                let mut existing_updates = self.storage_updates.get_mut(&address);
443
444                for (slot, value) in storage.storage {
445                    self.trie.record_slot_touch(address, slot);
446
447                    let encoded = if value.is_zero() {
448                        Vec::new()
449                    } else {
450                        alloy_rlp::encode_fixed_size(&value).to_vec()
451                    };
452                    new_updates.insert(slot, LeafUpdate::Changed(encoded));
453
454                    // Remove an existing storage update if it exists.
455                    if let Some(ref mut existing) = existing_updates {
456                        existing.remove(&slot);
457                    }
458                }
459            }
460
461            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
462            // trie for storage root update.
463            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
464
465            // Make sure account is tracked in `pending_account_updates` so that once storage root
466            // is computed, it will be updated in the accounts trie.
467            self.pending_account_updates.entry(address).or_insert(None);
468        }
469
470        for (address, account) in hashed_state_update.accounts {
471            self.trie.record_account_touch(address);
472
473            // Track account as touched.
474            //
475            // This might overwrite an existing update, which is fine, because storage root from it
476            // is already tracked in the trie and can be easily fetched again.
477            self.new_account_updates.insert(address, LeafUpdate::Touched);
478
479            // Track account in `pending_account_updates` so that once storage root is computed,
480            // it will be updated in the accounts trie.
481            self.pending_account_updates.insert(address, Some(account));
482        }
483    }
484
485    fn on_proof_result(
486        &mut self,
487        result: DecodedMultiProofV2,
488    ) -> Result<(), ParallelStateRootError> {
489        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
490            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
491        })
492    }
493
494    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
495        if self.pending_updates == 0 {
496            return Ok(());
497        }
498
499        let _span = debug_span!("process_new_updates").entered();
500        self.pending_updates = 0;
501
502        // Firstly apply all new storage and account updates to the tries.
503        self.process_leaf_updates(true)?;
504
505        for (address, mut new) in self.new_storage_updates.drain() {
506            match self.storage_updates.entry(address) {
507                Entry::Vacant(entry) => {
508                    entry.insert(new); // insert the whole map at once, no per-slot loop
509                }
510                Entry::Occupied(mut entry) => {
511                    let updates = entry.get_mut();
512                    for (slot, new) in new.drain() {
513                        match updates.entry(slot) {
514                            Entry::Occupied(mut slot_entry) => {
515                                if new.is_changed() {
516                                    slot_entry.insert(new);
517                                }
518                            }
519                            Entry::Vacant(slot_entry) => {
520                                slot_entry.insert(new);
521                            }
522                        }
523                    }
524                }
525            }
526        }
527
528        for (address, new) in self.new_account_updates.drain() {
529            match self.account_updates.entry(address) {
530                Entry::Occupied(mut entry) => {
531                    if new.is_changed() {
532                        entry.insert(new);
533                    }
534                }
535                Entry::Vacant(entry) => {
536                    entry.insert(new);
537                }
538            }
539        }
540
541        Ok(())
542    }
543
544    /// Applies all account and storage leaf updates to corresponding tries and collects any new
545    /// multiproof targets.
546    #[instrument(
547        level = "debug",
548        target = "engine::tree::payload_processor::sparse_trie",
549        skip_all
550    )]
551    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
552        let storage_updates =
553            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
554
555        // Process all storage updates, skipping tries with no pending updates.
556        let span = debug_span!("process_storage_leaf_updates").entered();
557        for (address, updates) in storage_updates {
558            if updates.is_empty() {
559                continue;
560            }
561            let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
562
563            let trie = self.trie.get_or_create_storage_trie_mut(*address);
564            let fetched = self.fetched_storage_targets.entry(*address).or_default();
565            let mut targets = Vec::new();
566
567            let updates_len_before = updates.len();
568            trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
569                Entry::Occupied(mut entry) => {
570                    if min_len < *entry.get() {
571                        entry.insert(min_len);
572                        targets.push(ProofV2Target::new(path).with_min_len(min_len));
573                    }
574                }
575                Entry::Vacant(entry) => {
576                    entry.insert(min_len);
577                    targets.push(ProofV2Target::new(path).with_min_len(min_len));
578                }
579            })?;
580            let updates_len_after = updates.len();
581            self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
582            self.storage_cache_misses += updates_len_after as u64;
583
584            if !targets.is_empty() {
585                self.pending_targets.extend_storage_targets(address, targets);
586            }
587        }
588
589        drop(span);
590
591        // Process account trie updates and fill the account targets.
592        self.process_account_leaf_updates(new)?;
593
594        Ok(())
595    }
596
597    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
598    ///
599    /// Returns whether any updates were drained (applied to the trie).
600    #[instrument(
601        level = "debug",
602        target = "engine::tree::payload_processor::sparse_trie",
603        skip_all
604    )]
605    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
606        let account_updates =
607            if new { &mut self.new_account_updates } else { &mut self.account_updates };
608
609        let updates_len_before = account_updates.len();
610
611        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
612            match self.fetched_account_targets.entry(target) {
613                Entry::Occupied(mut entry) => {
614                    if min_len < *entry.get() {
615                        entry.insert(min_len);
616                        self.pending_targets
617                            .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
618                    }
619                }
620                Entry::Vacant(entry) => {
621                    entry.insert(min_len);
622                    self.pending_targets
623                        .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
624                }
625            }
626        })?;
627
628        let updates_len_after = account_updates.len();
629        self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
630        self.account_cache_misses += updates_len_after as u64;
631
632        Ok(updates_len_after < updates_len_before)
633    }
634
635    /// Computes storage roots for accounts whose storage updates are fully drained.
636    ///
637    /// For each storage trie T that:
638    /// 1. was modified in the current block,
639    /// 2. all the storage updates are fully drained,
640    /// 3. but the storage root hasn't been updated yet,
641    ///
642    /// we trigger state root computation on a rayon pool.
643    #[instrument(
644        level = "debug",
645        target = "engine::tree::payload_processor::sparse_trie",
646        skip_all
647    )]
648    fn compute_drained_storage_roots(&mut self) {
649        let addresses_to_compute_roots: Vec<_> = self
650            .storage_updates
651            .iter()
652            .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
653            .collect();
654
655        struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
656        // SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
657        // documented at the use site below.
658        unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
659
660        let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
661            Vec::with_capacity(addresses_to_compute_roots.len());
662        for address in addresses_to_compute_roots {
663            if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
664                !trie.is_root_cached()
665            {
666                tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
667            }
668        }
669
670        let parent_span = tracing::Span::current();
671        tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
672            let _enter = debug_span!(
673                target: "engine::tree::payload_processor::sparse_trie",
674                parent: &parent_span,
675                "storage_root",
676                ?address
677            )
678            .entered();
679            // SAFETY:
680            // - pointers are created from `storage_tries_mut().get_mut(address)` above;
681            // - `addresses_to_compute_roots` comes from map iteration, so addresses are unique;
682            // - we do not insert/remove entries between pointer collection and use, so pointers
683            //   stay valid and map reallocation cannot occur;
684            // - each pointer is consumed by at most one rayon task, so no aliasing mutable access.
685            unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
686        });
687    }
688
689    /// Iterates through all storage tries for which all updates were processed, computes their
690    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
691    /// for accounts trie.
692    #[instrument(
693        level = "debug",
694        target = "engine::tree::payload_processor::sparse_trie",
695        skip_all
696    )]
697    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
698        self.process_leaf_updates(false)?;
699
700        if self.pending_account_updates.is_empty() {
701            return Ok(());
702        }
703
704        self.compute_drained_storage_roots();
705
706        loop {
707            let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
708            // Now handle pending account updates that can be upgraded to a proper update.
709            let account_rlp_buf = &mut self.account_rlp_buf;
710            let mut num_promoted = 0;
711            self.pending_account_updates.retain(|addr, account| {
712                if let Some(updates) = self.storage_updates.get(addr) {
713                    if !updates.is_empty() {
714                        // If account has pending storage updates, it is still pending.
715                        return true;
716                    } else if let Some(account) = account.take() {
717                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
718                        let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
719                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
720                        num_promoted += 1;
721                        return false;
722                    }
723                }
724
725                // Get the current account state either from the trie or from latest account update.
726                let trie_account = match self.account_updates.get(addr) {
727                    Some(LeafUpdate::Changed(encoded)) => {
728                        Some(encoded).filter(|encoded| !encoded.is_empty())
729                    }
730                    // Needs to be revealed first
731                    Some(LeafUpdate::Touched) => return true,
732                    None => self.trie.get_account_value(addr),
733                };
734
735                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
736
737                let (account, storage_root) = if let Some(account) = account.take() {
738                    // If account is Some(_) here it means it didn't have any storage updates
739                    // and we can fetch the storage root directly from the account trie.
740                    //
741                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
742                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
743
744                    (account, storage_root)
745                } else {
746                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
747                };
748
749                let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
750                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
751                num_promoted += 1;
752
753                false
754            });
755            span.record("promoted", num_promoted);
756            drop(span);
757
758            // Only exit when no new updates are processed.
759            //
760            // We need to keep iterating if any updates are being drained because that might
761            // indicate that more pending account updates can be promoted.
762            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
763                break
764            }
765        }
766
767        Ok(())
768    }
769
770    fn dispatch_pending_targets(&mut self) {
771        if self.pending_targets.is_empty() {
772            return;
773        }
774
775        let _span = debug_span!("dispatch_pending_targets").entered();
776        let (targets, chunking_length) = self.pending_targets.take();
777        dispatch_with_chunking(
778            targets,
779            chunking_length,
780            self.chunk_size,
781            self.max_targets_for_chunking,
782            self.proof_worker_handle.available_account_workers(),
783            self.proof_worker_handle.available_storage_workers(),
784            MultiProofTargetsV2::chunks,
785            |proof_targets| {
786                if let Err(e) =
787                    self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
788                        targets: proof_targets,
789                        proof_result_sender: ProofResultContext::new(
790                            self.proof_result_tx.clone(),
791                            HashedPostState::default(),
792                            Instant::now(),
793                        ),
794                    })
795                {
796                    error!("failed to dispatch account multiproof: {e:?}");
797                }
798            },
799        );
800    }
801}
802
803/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
804fn encode_account_leaf_value(
805    account: Option<Account>,
806    storage_root: B256,
807    account_rlp_buf: &mut Vec<u8>,
808) -> Vec<u8> {
809    if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
810        return Vec::new();
811    }
812
813    account_rlp_buf.clear();
814    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
815    account_rlp_buf.clone()
816}
817
818/// Pending proof targets queued for dispatch to proof workers, along with their count.
819#[derive(Default)]
820struct PendingTargets {
821    /// The proof targets.
822    targets: MultiProofTargetsV2,
823    /// Number of account + storage proof targets currently queued.
824    len: usize,
825}
826
827impl PendingTargets {
828    /// Returns the number of pending targets.
829    const fn len(&self) -> usize {
830        self.len
831    }
832
833    /// Returns `true` if there are no pending targets.
834    const fn is_empty(&self) -> bool {
835        self.len == 0
836    }
837
838    /// Takes the pending targets, replacing with empty defaults.
839    fn take(&mut self) -> (MultiProofTargetsV2, usize) {
840        (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
841    }
842
843    /// Adds a target to the account targets.
844    fn push_account_target(&mut self, target: ProofV2Target) {
845        self.targets.account_targets.push(target);
846        self.len += 1;
847    }
848
849    /// Extends storage targets for the given address.
850    fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
851        self.len += targets.len();
852        self.targets.storage_targets.entry(*address).or_default().extend(targets);
853    }
854}
855
856/// Message type for the sparse trie task.
857enum SparseTrieTaskMessage {
858    /// A hashed state update ready to be processed.
859    HashedState(HashedPostState),
860    /// Prefetch proof targets (passed through directly).
861    PrefetchProofs(MultiProofTargetsV2),
862    /// Signals that all state updates have been received.
863    FinishedStateUpdates,
864}
865
866/// Outcome of the state root computation, including the state root itself with
867/// the trie updates.
868#[derive(Debug, Clone)]
869pub struct StateRootComputeOutcome {
870    /// The state root.
871    pub state_root: B256,
872    /// The trie updates.
873    pub trie_updates: Arc<TrieUpdates>,
874    /// Debug recorders taken from the sparse tries, keyed by `None` for account trie
875    /// and `Some(address)` for storage tries.
876    #[cfg(feature = "trie-debug")]
877    pub debug_recorders: Vec<(Option<B256>, TrieDebugRecorder)>,
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883    use alloy_primitives::{keccak256, Address, B256, U256};
884    use reth_trie_sparse::ArenaParallelSparseTrie;
885
886    #[test]
887    fn test_run_hashing_task_hashed_state_update_forwards() {
888        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
889        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
890
891        let address = keccak256(Address::random());
892        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
893        let value = U256::from(999);
894
895        let mut hashed_state = HashedPostState::default();
896        hashed_state.accounts.insert(
897            address,
898            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
899        );
900        let mut storage = reth_trie::HashedStorage::new(false);
901        storage.storage.insert(slot, value);
902        hashed_state.storages.insert(address, storage);
903
904        let expected_state = hashed_state.clone();
905
906        let handle = std::thread::spawn(move || {
907            SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
908                updates_rx,
909                hashed_state_tx,
910                MultiProofTaskMetrics::default(),
911            );
912        });
913
914        updates_tx.send(MultiProofMessage::HashedStateUpdate(hashed_state)).unwrap();
915        updates_tx.send(MultiProofMessage::FinishedStateUpdates).unwrap();
916        drop(updates_tx);
917
918        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
919            panic!("expected HashedState message");
920        };
921
922        let account = received.accounts.get(&address).unwrap().unwrap();
923        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
924        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
925
926        let storage = received.storages.get(&address).unwrap();
927        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
928
929        let second = hashed_state_rx.recv().unwrap();
930        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
931
932        assert!(hashed_state_rx.recv().is_err());
933        handle.join().unwrap();
934    }
935
936    #[test]
937    fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
938        let mut account_rlp_buf = vec![0xAB];
939        let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
940
941        assert!(encoded.is_empty());
942        // Early return should not touch the caller's buffer.
943        assert_eq!(account_rlp_buf, vec![0xAB]);
944    }
945
946    #[test]
947    fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
948        let storage_root = B256::from([0x99; 32]);
949        let account = Some(Account {
950            nonce: 7,
951            balance: U256::from(42),
952            bytecode_hash: Some(B256::from([0xAA; 32])),
953        });
954        let mut account_rlp_buf = vec![0x00, 0x01];
955
956        let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
957        let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
958
959        assert_eq!(decoded.nonce, 7);
960        assert_eq!(decoded.balance, U256::from(42));
961        assert_eq!(decoded.storage_root, storage_root);
962        assert_eq!(account_rlp_buf, encoded);
963    }
964}