Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use std::sync::Arc;
4
5use crate::tree::{
6    multiproof::{
7        dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8        StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9    },
10    payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19    updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20    TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24    proof_task::{
25        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26    },
27    root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30    errors::{SparseStateTrieErrorKind, SparseTrieErrorKind, SparseTrieResult},
31    ConfigurableSparseTrie, DeferredDrops, LeafUpdate, RevealableSparseTrie, SparseStateTrie,
32    SparseTrie,
33};
34use revm_primitives::{hash_map::Entry, B256Map};
35use tracing::{debug, debug_span, error, instrument, trace_span};
36
37/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
38const MAX_PENDING_UPDATES: usize = 100;
39
40/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
41pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
42    /// Sender for proof results.
43    proof_result_tx: CrossbeamSender<ProofResultMessage>,
44    /// Receiver for proof results directly from workers.
45    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
46    /// Receives updates from execution and prewarming.
47    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
48    /// `SparseStateTrie` used for computing the state root.
49    trie: SparseStateTrie<A, S>,
50    /// The parent block's state root.
51    parent_state_root: B256,
52    /// Handle to the proof worker pools (storage and account).
53    proof_worker_handle: ProofWorkerHandle,
54
55    /// The size of proof targets chunk to spawn in one calculation.
56    /// If None, chunking is disabled and all targets are processed in a single proof.
57    chunk_size: usize,
58    /// If this number is exceeded and chunking is enabled, then this will override whether or not
59    /// there are any active workers and force chunking across workers. This is to prevent tasks
60    /// which are very long from hitting a single worker.
61    max_targets_for_chunking: usize,
62
63    /// Account trie updates.
64    account_updates: B256Map<LeafUpdate>,
65    /// Storage trie updates. hashed address -> slot -> update.
66    storage_updates: B256Map<B256Map<LeafUpdate>>,
67
68    /// Account updates that are buffered but were not yet applied to the trie.
69    new_account_updates: B256Map<LeafUpdate>,
70    /// Storage updates that are buffered but were not yet applied to the trie.
71    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
72    /// Account updates that are blocked by storage root calculation or account reveal.
73    ///
74    /// Those are being moved into `account_updates` once storage roots
75    /// are revealed and/or calculated.
76    ///
77    /// Invariant: for each entry in `pending_account_updates` account must either be already
78    /// revealed in the trie or have an entry in `account_updates`.
79    ///
80    /// Values can be either of:
81    ///   - None: account had a storage update and is awaiting storage root calculation and/or
82    ///     account node reveal to complete.
83    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
84    ///     to complete.
85    pending_account_updates: B256Map<Option<Option<Account>>>,
86    /// Cache of account proof targets that were already fetched/requested from the proof workers.
87    /// account -> lowest `min_len` requested.
88    fetched_account_targets: B256Map<u8>,
89    /// Cache of storage proof targets that have already been fetched/requested from the proof
90    /// workers. account -> slot -> lowest `min_len` requested.
91    fetched_storage_targets: B256Map<B256Map<u8>>,
92    /// Reusable buffer for RLP encoding of accounts.
93    account_rlp_buf: Vec<u8>,
94    /// Whether the last state update has been received.
95    finished_state_updates: bool,
96    /// Accumulated account leaf update cache hits.
97    account_cache_hits: u64,
98    /// Accumulated account leaf update cache misses.
99    account_cache_misses: u64,
100    /// Accumulated storage leaf update cache hits.
101    storage_cache_hits: u64,
102    /// Accumulated storage leaf update cache misses.
103    storage_cache_misses: u64,
104    /// Pending proof targets queued for dispatch to proof workers.
105    pending_targets: PendingTargets,
106    /// Number of pending execution/prewarming updates received but not yet passed to
107    /// `update_leaves`.
108    pending_updates: usize,
109
110    /// Metrics for the sparse trie.
111    metrics: MultiProofTaskMetrics,
112}
113
114impl<A, S> SparseTrieCacheTask<A, S>
115where
116    A: SparseTrie + Default,
117    S: SparseTrie + Default + Clone,
118{
119    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
120    pub(super) fn new_with_trie(
121        executor: &Runtime,
122        updates: CrossbeamReceiver<StateRootMessage>,
123        proof_worker_handle: ProofWorkerHandle,
124        metrics: MultiProofTaskMetrics,
125        trie: SparseStateTrie<A, S>,
126        parent_state_root: B256,
127        chunk_size: usize,
128    ) -> Self {
129        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
130        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
131
132        let parent_span = tracing::Span::current();
133        let hashing_metrics = metrics.clone();
134        executor.spawn_blocking_named("trie-hashing", move || {
135            let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
136            Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
137        });
138
139        Self {
140            proof_result_tx,
141            proof_result_rx,
142            updates: hashed_state_rx,
143            proof_worker_handle,
144            trie,
145            parent_state_root,
146            chunk_size,
147            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
148            account_updates: Default::default(),
149            storage_updates: Default::default(),
150            new_account_updates: Default::default(),
151            new_storage_updates: Default::default(),
152            pending_account_updates: Default::default(),
153            fetched_account_targets: Default::default(),
154            fetched_storage_targets: Default::default(),
155            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
156            finished_state_updates: Default::default(),
157            account_cache_hits: 0,
158            account_cache_misses: 0,
159            storage_cache_hits: 0,
160            storage_cache_misses: 0,
161            pending_targets: Default::default(),
162            pending_updates: Default::default(),
163            metrics,
164        }
165    }
166
167    /// Runs the hashing task that drains updates from the channel and converts them to
168    /// `HashedPostState` in parallel.
169    fn run_hashing_task(
170        updates: CrossbeamReceiver<StateRootMessage>,
171        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
172        metrics: MultiProofTaskMetrics,
173    ) {
174        let mut total_idle_time = std::time::Duration::ZERO;
175        let mut idle_start = Instant::now();
176
177        while let Ok(message) = updates.recv() {
178            total_idle_time += idle_start.elapsed();
179
180            let msg = match message {
181                StateRootMessage::PrefetchProofs(targets) => {
182                    SparseTrieTaskMessage::PrefetchProofs(targets)
183                }
184                StateRootMessage::StateUpdate(_, state) => {
185                    let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
186                    let hashed = evm_state_to_hashed_post_state(state);
187                    SparseTrieTaskMessage::HashedState(hashed)
188                }
189                StateRootMessage::FinishedStateUpdates => {
190                    SparseTrieTaskMessage::FinishedStateUpdates
191                }
192                StateRootMessage::BlockAccessList(_) => {
193                    idle_start = Instant::now();
194                    continue;
195                }
196                StateRootMessage::HashedStateUpdate(state) => {
197                    SparseTrieTaskMessage::HashedState(state)
198                }
199            };
200            if hashed_state_tx.send(msg).is_err() {
201                break;
202            }
203
204            idle_start = Instant::now();
205        }
206
207        metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
208    }
209
210    /// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
211    ///
212    /// Should be called after the state root result has been sent.
213    ///
214    /// When `disable_pruning` is true, the trie is preserved without any node pruning,
215    /// storage trie eviction, or capacity shrinking, keeping the full cache intact for
216    /// benchmarking purposes.
217    pub(super) fn into_trie_for_reuse(
218        self,
219        max_hot_slots: usize,
220        max_hot_accounts: usize,
221        max_nodes_capacity: usize,
222        max_values_capacity: usize,
223        disable_pruning: bool,
224        updates: &TrieUpdates,
225    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
226        let Self { mut trie, .. } = self;
227        trie.commit_updates(updates);
228        if !disable_pruning {
229            trie.prune(max_hot_slots, max_hot_accounts);
230            trie.shrink_to(max_nodes_capacity, max_values_capacity);
231        }
232        let deferred = trie.take_deferred_drops();
233        (trie, deferred)
234    }
235
236    /// Clears and shrinks the trie, discarding all state.
237    ///
238    /// Use this when the payload was invalid or cancelled - we don't want to preserve
239    /// potentially invalid trie state, but we keep the allocations for reuse.
240    pub(super) fn into_cleared_trie(
241        self,
242        max_nodes_capacity: usize,
243        max_values_capacity: usize,
244    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
245        let Self { mut trie, .. } = self;
246        trie.clear();
247        trie.shrink_to(max_nodes_capacity, max_values_capacity);
248        let deferred = trie.take_deferred_drops();
249        (trie, deferred)
250    }
251
252    /// Runs the sparse trie task to completion.
253    ///
254    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
255    /// to the trie and schedules proof fetching when needed.
256    ///
257    /// This concludes once the last state update has been received and processed.
258    #[instrument(
259        name = "SparseTrieCacheTask::run",
260        level = "debug",
261        target = "engine::tree::payload_processor::sparse_trie",
262        skip_all
263    )]
264    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
265        let now = Instant::now();
266
267        let mut total_idle_time = std::time::Duration::ZERO;
268        let mut idle_start = Instant::now();
269
270        loop {
271            let mut t = Instant::now();
272            crossbeam_channel::select_biased! {
273                recv(self.updates) -> message => {
274                    let wake = Instant::now();
275
276                    let update = match message {
277                        Ok(m) => m,
278                        Err(_) => {
279                            return Err(ParallelStateRootError::Other(
280                                "updates channel disconnected before state root calculation".to_string(),
281                            ))
282                        }
283                    };
284
285                    total_idle_time += wake.duration_since(idle_start);
286                    self.metrics
287                        .sparse_trie_channel_wait_duration_histogram
288                        .record(wake.duration_since(t));
289
290                    self.on_message(update);
291                    self.pending_updates += 1;
292                }
293                recv(self.proof_result_rx) -> message => {
294                    let phase_end = Instant::now();
295                    total_idle_time += phase_end.duration_since(idle_start);
296                    self.metrics
297                        .sparse_trie_channel_wait_duration_histogram
298                        .record(phase_end.duration_since(t));
299                    t = phase_end;
300
301                    let Ok(result) = message else {
302                        unreachable!("we own the sender half")
303                    };
304
305                    let mut result = result.result?;
306                    while let Ok(next) = self.proof_result_rx.try_recv() {
307                        let res = next.result?;
308                        result.extend(res);
309                    }
310
311                    let phase_end = Instant::now();
312                    self.metrics
313                        .sparse_trie_proof_coalesce_duration_histogram
314                        .record(phase_end.duration_since(t));
315                    t = phase_end;
316
317                    self.on_proof_result(result)?;
318                    self.metrics
319                        .sparse_trie_reveal_multiproof_duration_histogram
320                        .record(t.elapsed());
321                },
322            }
323
324            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
325                // If we don't have any pending messages, we can spend some time on computing
326                // storage roots and promoting account updates.
327                self.dispatch_pending_targets();
328                t = Instant::now();
329                self.process_new_updates()?;
330                self.promote_pending_account_updates()?;
331                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
332
333                if self.finished_state_updates &&
334                    self.account_updates.is_empty() &&
335                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
336                {
337                    break;
338                }
339
340                self.dispatch_pending_targets();
341
342                // If there's still no pending updates spend some time pre-computing the account
343                // trie upper hashes
344                if self.proof_result_rx.is_empty() {
345                    self.trie.calculate_subtries();
346                }
347            } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
348                // If we don't have any pending updates OR we've accumulated a lot already, apply
349                // them to the trie,
350                t = Instant::now();
351                self.process_new_updates()?;
352                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
353                self.dispatch_pending_targets();
354            } else if self.pending_targets.len() > self.chunk_size {
355                // Make sure to dispatch targets if we've accumulated a lot of them.
356                self.dispatch_pending_targets();
357            }
358
359            idle_start = Instant::now();
360        }
361
362        self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
363
364        debug!(target: "engine::root", "All proofs processed, ending calculation");
365
366        let start = Instant::now();
367        let (state_root, trie_updates) = match self.trie.root_with_updates() {
368            Ok(result) => result,
369            Err(err)
370                if matches!(
371                    err.kind(),
372                    SparseStateTrieErrorKind::Sparse(SparseTrieErrorKind::Blind)
373                ) =>
374            {
375                // A still-blind account trie means this block never changed state, so preserve
376                // the cached parent root instead of fetching and revealing
377                // the unchanged root node.
378                (self.parent_state_root, TrieUpdates::default())
379            }
380            Err(err) => {
381                return Err(ParallelStateRootError::Other(format!(
382                    "could not calculate state root: {err:?}"
383                )))
384            }
385        };
386
387        #[cfg(feature = "trie-debug")]
388        let debug_recorders = self.trie.take_debug_recorders();
389
390        let end = Instant::now();
391        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
392        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
393
394        self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
395        self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
396        self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
397        self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
398        self.account_cache_hits = 0;
399        self.account_cache_misses = 0;
400        self.storage_cache_hits = 0;
401        self.storage_cache_misses = 0;
402
403        Ok(StateRootComputeOutcome {
404            state_root,
405            trie_updates: Arc::new(trie_updates),
406            #[cfg(feature = "trie-debug")]
407            debug_recorders,
408        })
409    }
410
411    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
412    fn on_message(&mut self, message: SparseTrieTaskMessage) {
413        match message {
414            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
415            SparseTrieTaskMessage::HashedState(hashed_state) => {
416                self.on_hashed_state_update(hashed_state)
417            }
418            SparseTrieTaskMessage::FinishedStateUpdates => self.finished_state_updates = true,
419        }
420    }
421
422    #[instrument(
423        level = "trace",
424        target = "engine::tree::payload_processor::sparse_trie",
425        skip_all
426    )]
427    fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
428        for target in targets.account_targets {
429            // Only touch accounts that are not yet present in the updates set.
430            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
431        }
432
433        for (address, slots) in targets.storage_targets {
434            if !slots.is_empty() {
435                // Look up outer map once per address instead of once per slot.
436                let new_updates = self.new_storage_updates.entry(address).or_default();
437                for slot in slots {
438                    // Only touch storages that are not yet present in the updates set.
439                    new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
440                }
441            }
442
443            // Touch corresponding account leaf to make sure its revealed in accounts trie for
444            // storage root update.
445            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
446        }
447    }
448
449    /// Processes a hashed state update and encodes all state changes as trie updates.
450    #[instrument(
451        level = "trace",
452        target = "engine::tree::payload_processor::sparse_trie",
453        skip_all
454    )]
455    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
456        for (address, storage) in hashed_state_update.storages {
457            if !storage.storage.is_empty() {
458                // Look up outer maps once per address instead of once per slot.
459                let new_updates = self.new_storage_updates.entry(address).or_default();
460                let mut existing_updates = self.storage_updates.get_mut(&address);
461
462                for (slot, value) in storage.storage {
463                    self.trie.record_slot_touch(address, slot);
464
465                    let encoded = if value.is_zero() {
466                        Vec::new()
467                    } else {
468                        alloy_rlp::encode_fixed_size(&value).to_vec()
469                    };
470                    new_updates.insert(slot, LeafUpdate::Changed(encoded));
471
472                    // Remove an existing storage update if it exists.
473                    if let Some(ref mut existing) = existing_updates {
474                        existing.remove(&slot);
475                    }
476                }
477            }
478
479            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
480            // trie for storage root update.
481            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
482
483            // Make sure account is tracked in `pending_account_updates` so that once storage root
484            // is computed, it will be updated in the accounts trie.
485            self.pending_account_updates.entry(address).or_insert(None);
486        }
487
488        for (address, account) in hashed_state_update.accounts {
489            self.trie.record_account_touch(address);
490
491            // Track account as touched.
492            //
493            // This might overwrite an existing update, which is fine, because storage root from it
494            // is already tracked in the trie and can be easily fetched again.
495            self.new_account_updates.insert(address, LeafUpdate::Touched);
496
497            // Track account in `pending_account_updates` so that once storage root is computed,
498            // it will be updated in the accounts trie.
499            self.pending_account_updates.insert(address, Some(account));
500        }
501    }
502
503    fn on_proof_result(
504        &mut self,
505        result: DecodedMultiProofV2,
506    ) -> Result<(), ParallelStateRootError> {
507        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
508            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
509        })
510    }
511
512    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
513        if self.pending_updates == 0 {
514            return Ok(());
515        }
516
517        let _span = debug_span!("process_new_updates").entered();
518        self.pending_updates = 0;
519
520        // Firstly apply all new storage and account updates to the tries.
521        self.process_leaf_updates(true)?;
522
523        for (address, mut new) in self.new_storage_updates.drain() {
524            match self.storage_updates.entry(address) {
525                Entry::Vacant(entry) => {
526                    entry.insert(new); // insert the whole map at once, no per-slot loop
527                }
528                Entry::Occupied(mut entry) => {
529                    let updates = entry.get_mut();
530                    for (slot, new) in new.drain() {
531                        match updates.entry(slot) {
532                            Entry::Occupied(mut slot_entry) => {
533                                if new.is_changed() {
534                                    slot_entry.insert(new);
535                                }
536                            }
537                            Entry::Vacant(slot_entry) => {
538                                slot_entry.insert(new);
539                            }
540                        }
541                    }
542                }
543            }
544        }
545
546        for (address, new) in self.new_account_updates.drain() {
547            match self.account_updates.entry(address) {
548                Entry::Occupied(mut entry) => {
549                    if new.is_changed() {
550                        entry.insert(new);
551                    }
552                }
553                Entry::Vacant(entry) => {
554                    entry.insert(new);
555                }
556            }
557        }
558
559        Ok(())
560    }
561
562    /// Applies all account and storage leaf updates to corresponding tries and collects any new
563    /// multiproof targets.
564    #[instrument(
565        level = "trace",
566        target = "engine::tree::payload_processor::sparse_trie",
567        skip_all
568    )]
569    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
570        let storage_updates =
571            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
572
573        // Process all storage updates, skipping tries with no pending updates.
574        let span = trace_span!("process_storage_leaf_updates").entered();
575        for (address, updates) in storage_updates {
576            if updates.is_empty() {
577                continue;
578            }
579            let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
580
581            let trie = self.trie.get_or_create_storage_trie_mut(*address);
582            let fetched = self.fetched_storage_targets.entry(*address).or_default();
583            let mut targets = Vec::new();
584
585            let updates_len_before = updates.len();
586            trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
587                Entry::Occupied(mut entry) => {
588                    if min_len < *entry.get() {
589                        entry.insert(min_len);
590                        targets.push(ProofV2Target::new(path).with_min_len(min_len));
591                    }
592                }
593                Entry::Vacant(entry) => {
594                    entry.insert(min_len);
595                    targets.push(ProofV2Target::new(path).with_min_len(min_len));
596                }
597            })?;
598            let updates_len_after = updates.len();
599            self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
600            self.storage_cache_misses += updates_len_after as u64;
601
602            if !targets.is_empty() {
603                self.pending_targets.extend_storage_targets(address, targets);
604            }
605        }
606
607        drop(span);
608
609        // Process account trie updates and fill the account targets.
610        self.process_account_leaf_updates(new)?;
611
612        Ok(())
613    }
614
615    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
616    ///
617    /// Returns whether any updates were drained (applied to the trie).
618    #[instrument(
619        level = "trace",
620        target = "engine::tree::payload_processor::sparse_trie",
621        skip_all
622    )]
623    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
624        let account_updates =
625            if new { &mut self.new_account_updates } else { &mut self.account_updates };
626
627        let updates_len_before = account_updates.len();
628
629        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
630            match self.fetched_account_targets.entry(target) {
631                Entry::Occupied(mut entry) => {
632                    if min_len < *entry.get() {
633                        entry.insert(min_len);
634                        self.pending_targets
635                            .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
636                    }
637                }
638                Entry::Vacant(entry) => {
639                    entry.insert(min_len);
640                    self.pending_targets
641                        .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
642                }
643            }
644        })?;
645
646        let updates_len_after = account_updates.len();
647        self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
648        self.account_cache_misses += updates_len_after as u64;
649
650        Ok(updates_len_after < updates_len_before)
651    }
652
653    /// Computes storage roots for accounts whose storage updates are fully drained.
654    ///
655    /// For each storage trie T that:
656    /// 1. was modified in the current block,
657    /// 2. all the storage updates are fully drained,
658    /// 3. but the storage root hasn't been updated yet,
659    ///
660    /// we trigger state root computation on a rayon pool.
661    fn compute_drained_storage_roots(&mut self) {
662        let addresses_to_compute_roots: Vec<_> = self
663            .storage_updates
664            .iter()
665            .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
666            .collect();
667
668        struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
669        // SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
670        // documented at the use site below.
671        unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
672
673        let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
674            Vec::with_capacity(addresses_to_compute_roots.len());
675        for address in addresses_to_compute_roots {
676            if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
677                !trie.is_root_cached()
678            {
679                tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
680            }
681        }
682
683        if tries_to_compute_roots.is_empty() {
684            return;
685        }
686
687        let parent_span =
688            debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
689        tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
690            let span = if tracing::enabled!(tracing::Level::TRACE) {
691                debug_span!(
692                    target: "engine::tree::payload_processor::sparse_trie",
693                    parent: &parent_span,
694                    "storage_root",
695                    ?address
696                )
697            } else {
698                debug_span!(
699                    target: "engine::tree::payload_processor::sparse_trie",
700                    parent: &parent_span,
701                    "storage_root",
702                )
703            };
704            let _enter = span.entered();
705            // SAFETY:
706            // - pointers are created from `storage_tries_mut().get_mut(address)` above;
707            // - `addresses_to_compute_roots` comes from map iteration, so addresses are unique;
708            // - we do not insert/remove entries between pointer collection and use, so pointers
709            //   stay valid and map reallocation cannot occur;
710            // - each pointer is consumed by at most one rayon task, so no aliasing mutable access.
711            unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
712        });
713    }
714
715    /// Iterates through all storage tries for which all updates were processed, computes their
716    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
717    /// for accounts trie.
718    #[instrument(
719        level = "trace",
720        target = "engine::tree::payload_processor::sparse_trie",
721        skip_all
722    )]
723    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
724        self.process_leaf_updates(false)?;
725
726        if self.pending_account_updates.is_empty() {
727            return Ok(());
728        }
729
730        self.compute_drained_storage_roots();
731
732        loop {
733            let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
734            // Now handle pending account updates that can be upgraded to a proper update.
735            let account_rlp_buf = &mut self.account_rlp_buf;
736            let mut num_promoted = 0;
737            self.pending_account_updates.retain(|addr, account| {
738                if let Some(updates) = self.storage_updates.get(addr) {
739                    if !updates.is_empty() {
740                        // If account has pending storage updates, it is still pending.
741                        return true;
742                    } else if let Some(account) = account.take() {
743                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
744                        let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
745                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
746                        num_promoted += 1;
747                        return false;
748                    }
749                }
750
751                // Get the current account state either from the trie or from latest account update.
752                let trie_account = match self.account_updates.get(addr) {
753                    Some(LeafUpdate::Changed(encoded)) => {
754                        Some(encoded).filter(|encoded| !encoded.is_empty())
755                    }
756                    // Needs to be revealed first
757                    Some(LeafUpdate::Touched) => return true,
758                    None => self.trie.get_account_value(addr),
759                };
760
761                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
762
763                let (account, storage_root) = if let Some(account) = account.take() {
764                    // If account is Some(_) here it means it didn't have any storage updates
765                    // and we can fetch the storage root directly from the account trie.
766                    //
767                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
768                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
769
770                    (account, storage_root)
771                } else {
772                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
773                };
774
775                let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
776                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
777                num_promoted += 1;
778
779                false
780            });
781            span.record("promoted", num_promoted);
782            drop(span);
783
784            // Only exit when no new updates are processed.
785            //
786            // We need to keep iterating if any updates are being drained because that might
787            // indicate that more pending account updates can be promoted.
788            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
789                break
790            }
791        }
792
793        Ok(())
794    }
795
796    fn dispatch_pending_targets(&mut self) {
797        if self.pending_targets.is_empty() {
798            return;
799        }
800
801        let _span = trace_span!("dispatch_pending_targets").entered();
802        let (targets, chunking_length) = self.pending_targets.take();
803        dispatch_with_chunking(
804            targets,
805            chunking_length,
806            self.chunk_size,
807            self.max_targets_for_chunking,
808            self.proof_worker_handle.has_multiple_idle_account_workers(),
809            self.proof_worker_handle.has_multiple_idle_storage_workers(),
810            MultiProofTargetsV2::chunks,
811            |proof_targets| {
812                if let Err(e) =
813                    self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
814                        targets: proof_targets,
815                        proof_result_sender: ProofResultContext::new(
816                            self.proof_result_tx.clone(),
817                            HashedPostState::default(),
818                            Instant::now(),
819                        ),
820                    })
821                {
822                    error!("failed to dispatch account multiproof: {e:?}");
823                }
824            },
825        );
826    }
827}
828
829/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
830fn encode_account_leaf_value(
831    account: Option<Account>,
832    storage_root: B256,
833    account_rlp_buf: &mut Vec<u8>,
834) -> Vec<u8> {
835    if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
836        return Vec::new();
837    }
838
839    account_rlp_buf.clear();
840    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
841    account_rlp_buf.clone()
842}
843
844/// Pending proof targets queued for dispatch to proof workers, along with their count.
845#[derive(Default)]
846struct PendingTargets {
847    /// The proof targets.
848    targets: MultiProofTargetsV2,
849    /// Number of account + storage proof targets currently queued.
850    len: usize,
851}
852
853impl PendingTargets {
854    /// Returns the number of pending targets.
855    const fn len(&self) -> usize {
856        self.len
857    }
858
859    /// Returns `true` if there are no pending targets.
860    const fn is_empty(&self) -> bool {
861        self.len == 0
862    }
863
864    /// Takes the pending targets, replacing with empty defaults.
865    fn take(&mut self) -> (MultiProofTargetsV2, usize) {
866        (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
867    }
868
869    /// Adds a target to the account targets.
870    fn push_account_target(&mut self, target: ProofV2Target) {
871        self.targets.account_targets.push(target);
872        self.len += 1;
873    }
874
875    /// Extends storage targets for the given address.
876    fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
877        self.len += targets.len();
878        self.targets.storage_targets.entry(*address).or_default().extend(targets);
879    }
880}
881
882/// Message type for the sparse trie task.
883enum SparseTrieTaskMessage {
884    /// A hashed state update ready to be processed.
885    HashedState(HashedPostState),
886    /// Prefetch proof targets (passed through directly).
887    PrefetchProofs(MultiProofTargetsV2),
888    /// Signals that all state updates have been received.
889    FinishedStateUpdates,
890}
891
892#[cfg(test)]
893mod tests {
894    use super::*;
895    use alloy_primitives::{keccak256, Address, B256, U256};
896    use reth_provider::{
897        providers::{OverlayBuilder, OverlayStateProviderFactory},
898        test_utils::create_test_provider_factory,
899        ChainSpecProvider,
900    };
901    use reth_trie_db::ChangesetCache;
902    use reth_trie_parallel::proof_task::ProofTaskCtx;
903    use reth_trie_sparse::ArenaParallelSparseTrie;
904
905    #[test]
906    fn test_run_hashing_task_hashed_state_update_forwards() {
907        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
908        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
909
910        let address = keccak256(Address::random());
911        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
912        let value = U256::from(999);
913
914        let mut hashed_state = HashedPostState::default();
915        hashed_state.accounts.insert(
916            address,
917            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
918        );
919        let mut storage = reth_trie::HashedStorage::new(false);
920        storage.storage.insert(slot, value);
921        hashed_state.storages.insert(address, storage);
922
923        let expected_state = hashed_state.clone();
924
925        let handle = std::thread::spawn(move || {
926            SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
927                updates_rx,
928                hashed_state_tx,
929                MultiProofTaskMetrics::default(),
930            );
931        });
932
933        updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
934        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
935        drop(updates_tx);
936
937        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
938            panic!("expected HashedState message");
939        };
940
941        let account = received.accounts.get(&address).unwrap().unwrap();
942        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
943        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
944
945        let storage = received.storages.get(&address).unwrap();
946        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
947
948        let second = hashed_state_rx.recv().unwrap();
949        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
950
951        assert!(hashed_state_rx.recv().is_err());
952        handle.join().unwrap();
953    }
954
955    #[test]
956    fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
957        let mut account_rlp_buf = vec![0xAB];
958        let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
959
960        assert!(encoded.is_empty());
961        // Early return should not touch the caller's buffer.
962        assert_eq!(account_rlp_buf, vec![0xAB]);
963    }
964
965    #[test]
966    fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
967        let storage_root = B256::from([0x99; 32]);
968        let account = Some(Account {
969            nonce: 7,
970            balance: U256::from(42),
971            bytecode_hash: Some(B256::from([0xAA; 32])),
972        });
973        let mut account_rlp_buf = vec![0x00, 0x01];
974
975        let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
976        let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
977
978        assert_eq!(decoded.nonce, 7);
979        assert_eq!(decoded.balance, U256::from(42));
980        assert_eq!(decoded.storage_root, storage_root);
981        assert_eq!(account_rlp_buf, encoded);
982    }
983
984    #[test]
985    fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
986        let runtime = reth_tasks::Runtime::test();
987        let provider_factory = create_test_provider_factory();
988        let anchor_hash = provider_factory.chain_spec().genesis_hash();
989        let overlay_factory = OverlayStateProviderFactory::new(
990            provider_factory,
991            OverlayBuilder::<reth_chain_state::EthPrimitives>::new(
992                anchor_hash,
993                ChangesetCache::new(),
994            ),
995        );
996        let proof_worker_handle =
997            ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);
998
999        let default_trie = RevealableSparseTrie::blind_from(ConfigurableSparseTrie::Arena(
1000            ArenaParallelSparseTrie::default(),
1001        ));
1002        let trie = SparseStateTrie::default()
1003            .with_accounts_trie(default_trie.clone())
1004            .with_default_storage_trie(default_trie)
1005            .with_updates(true);
1006
1007        let parent_state_root = B256::from([0x55; 32]);
1008        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1009        let mut task = SparseTrieCacheTask::new_with_trie(
1010            &runtime,
1011            updates_rx,
1012            proof_worker_handle,
1013            MultiProofTaskMetrics::default(),
1014            trie,
1015            parent_state_root,
1016            1,
1017        );
1018
1019        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
1020        drop(updates_tx);
1021
1022        let outcome = task.run().expect("state root computation should succeed");
1023
1024        assert_eq!(outcome.state_root, parent_state_root);
1025        assert!(outcome.trie_updates.is_empty());
1026        assert!(task.trie.state_trie_ref().is_none(), "blind trie should not be revealed");
1027    }
1028}