Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use std::sync::Arc;
4
5use crate::tree::{
6    multiproof::{
7        dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8        StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9    },
10    payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19    updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20    TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24    proof_task::{
25        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26    },
27    root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30    errors::{SparseStateTrieErrorKind, SparseTrieErrorKind, SparseTrieResult},
31    ArenaParallelSparseTrie, DeferredDrops, LeafUpdate, RevealableSparseTrie, SparseStateTrie,
32    SparseTrie,
33};
34use revm_primitives::{hash_map::Entry, B256Map};
35use tracing::{debug, debug_span, error, instrument, trace_span};
36
37/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
38pub(super) struct SparseTrieCacheTask<A = ArenaParallelSparseTrie, S = ArenaParallelSparseTrie> {
39    /// Sender for proof results.
40    proof_result_tx: CrossbeamSender<ProofResultMessage>,
41    /// Receiver for proof results directly from workers.
42    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
43    /// Receives updates from execution and prewarming.
44    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
45    /// Sender half for the channel to send final hashed state to.
46    final_hashed_state_tx: Option<std::sync::mpsc::Sender<HashedPostState>>,
47    /// `SparseStateTrie` used for computing the state root.
48    trie: SparseStateTrie<A, S>,
49    /// The parent block's state root.
50    parent_state_root: B256,
51    /// Handle to the proof worker pools (storage and account).
52    proof_worker_handle: ProofWorkerHandle,
53
54    /// The size of proof targets chunk to spawn in one calculation.
55    /// If None, chunking is disabled and all targets are processed in a single proof.
56    chunk_size: usize,
57    /// If this number is exceeded and chunking is enabled, then this will override whether or not
58    /// there are any active workers and force chunking across workers. This is to prevent tasks
59    /// which are very long from hitting a single worker.
60    max_targets_for_chunking: usize,
61
62    /// Account trie updates.
63    account_updates: B256Map<LeafUpdate>,
64    /// Storage trie updates. hashed address -> slot -> update.
65    storage_updates: B256Map<B256Map<LeafUpdate>>,
66
67    /// Account updates that are buffered but were not yet applied to the trie.
68    new_account_updates: B256Map<LeafUpdate>,
69    /// Storage updates that are buffered but were not yet applied to the trie.
70    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
71    /// Account updates that are blocked by storage root calculation or account reveal.
72    ///
73    /// Those are being moved into `account_updates` once storage roots
74    /// are revealed and/or calculated.
75    ///
76    /// Invariant: for each entry in `pending_account_updates` account must either be already
77    /// revealed in the trie or have an entry in `account_updates`.
78    ///
79    /// Values can be either of:
80    ///   - None: account had a storage update and is awaiting storage root calculation and/or
81    ///     account node reveal to complete.
82    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
83    ///     to complete.
84    pending_account_updates: B256Map<Option<Option<Account>>>,
85    /// Cache of account proof targets that were already fetched/requested from the proof workers.
86    /// account -> lowest `min_len` requested.
87    fetched_account_targets: B256Map<u8>,
88    /// Cache of storage proof targets that have already been fetched/requested from the proof
89    /// workers. account -> slot -> lowest `min_len` requested.
90    fetched_storage_targets: B256Map<B256Map<u8>>,
91    /// Reusable buffer for RLP encoding of accounts.
92    account_rlp_buf: Vec<u8>,
93    /// Whether the last state update has been received.
94    finished_state_updates: bool,
95    /// Accumulated account leaf update cache hits.
96    account_cache_hits: u64,
97    /// Accumulated account leaf update cache misses.
98    account_cache_misses: u64,
99    /// Accumulated storage leaf update cache hits.
100    storage_cache_hits: u64,
101    /// Accumulated storage leaf update cache misses.
102    storage_cache_misses: u64,
103    /// Pending proof targets queued for dispatch to proof workers.
104    pending_targets: PendingTargets,
105    /// Number of pending execution/prewarming updates received but not yet passed to
106    /// `update_leaves`.
107    pending_updates: usize,
108    /// Combined final hashed state.
109    ///
110    /// Sparse trie task observes and hashes all state updates, allowing it to cheaply construct a
111    /// final [`HashedPostState`] and share it with main engine thread without requiring any extra
112    /// hashing work.
113    final_hashed_state: HashedPostState,
114
115    /// Metrics for the sparse trie.
116    metrics: MultiProofTaskMetrics,
117}
118
119impl<A, S> SparseTrieCacheTask<A, S>
120where
121    A: SparseTrie + Default,
122    S: SparseTrie + Default + Clone,
123{
124    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
125    #[expect(clippy::too_many_arguments)]
126    pub(super) fn new_with_trie(
127        executor: &Runtime,
128        updates: CrossbeamReceiver<StateRootMessage>,
129        final_hashed_state_tx: std::sync::mpsc::Sender<HashedPostState>,
130        proof_worker_handle: ProofWorkerHandle,
131        metrics: MultiProofTaskMetrics,
132        trie: SparseStateTrie<A, S>,
133        parent_state_root: B256,
134        chunk_size: usize,
135    ) -> Self {
136        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
137        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
138
139        let parent_span = tracing::Span::current();
140        let hashing_metrics = metrics.clone();
141        executor.spawn_blocking_named("trie-hashing", move || {
142            let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
143            Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
144        });
145
146        Self {
147            proof_result_tx,
148            proof_result_rx,
149            updates: hashed_state_rx,
150            proof_worker_handle,
151            final_hashed_state_tx: Some(final_hashed_state_tx),
152            trie,
153            parent_state_root,
154            chunk_size,
155            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
156            account_updates: Default::default(),
157            storage_updates: Default::default(),
158            new_account_updates: Default::default(),
159            new_storage_updates: Default::default(),
160            pending_account_updates: Default::default(),
161            fetched_account_targets: Default::default(),
162            fetched_storage_targets: Default::default(),
163            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
164            finished_state_updates: Default::default(),
165            account_cache_hits: 0,
166            account_cache_misses: 0,
167            storage_cache_hits: 0,
168            storage_cache_misses: 0,
169            pending_targets: Default::default(),
170            pending_updates: Default::default(),
171            final_hashed_state: Default::default(),
172            metrics,
173        }
174    }
175
176    /// Runs the hashing task that drains updates from the channel and converts them to
177    /// `HashedPostState` in parallel.
178    fn run_hashing_task(
179        updates: CrossbeamReceiver<StateRootMessage>,
180        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
181        metrics: MultiProofTaskMetrics,
182    ) {
183        let mut total_idle_time = std::time::Duration::ZERO;
184        let mut idle_start = Instant::now();
185
186        while let Ok(message) = updates.recv() {
187            total_idle_time += idle_start.elapsed();
188
189            let msg = match message {
190                StateRootMessage::PrefetchProofs(targets) => {
191                    SparseTrieTaskMessage::PrefetchProofs(targets)
192                }
193                StateRootMessage::StateUpdate(state) => {
194                    let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
195                    let hashed = evm_state_to_hashed_post_state(state);
196                    SparseTrieTaskMessage::HashedState(hashed)
197                }
198                StateRootMessage::FinishedStateUpdates => {
199                    SparseTrieTaskMessage::FinishedStateUpdates
200                }
201                StateRootMessage::BlockAccessList(_) => {
202                    idle_start = Instant::now();
203                    continue;
204                }
205                StateRootMessage::HashedStateUpdate(state) => {
206                    SparseTrieTaskMessage::HashedState(state)
207                }
208            };
209            if hashed_state_tx.send(msg).is_err() {
210                break;
211            }
212
213            idle_start = Instant::now();
214        }
215
216        metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
217    }
218
219    /// Prunes the trie for reuse in the next payload built on top of this one.
220    ///
221    /// Should be called after the state root result has been sent.
222    ///
223    /// When `disable_pruning` is true, the trie is preserved without any node pruning or storage
224    /// trie eviction, keeping the full cache intact for benchmarking purposes.
225    pub(super) fn into_trie_for_reuse(
226        self,
227        max_hot_slots: usize,
228        max_hot_accounts: usize,
229        disable_pruning: bool,
230    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
231        let Self { mut trie, .. } = self;
232        if !disable_pruning {
233            trie.prune(max_hot_slots, max_hot_accounts);
234        }
235        let deferred = trie.take_deferred_drops();
236        (trie, deferred)
237    }
238
239    /// Clears the trie, discarding all state.
240    ///
241    /// Use this when the payload was invalid or cancelled - we don't want to preserve
242    /// potentially invalid trie state, but we keep the allocations for reuse.
243    pub(super) fn into_cleared_trie(self) -> (SparseStateTrie<A, S>, DeferredDrops) {
244        let Self { mut trie, .. } = self;
245        trie.clear();
246        let deferred = trie.take_deferred_drops();
247        (trie, deferred)
248    }
249
250    /// Runs the sparse trie task to completion.
251    ///
252    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
253    /// to the trie and schedules proof fetching when needed.
254    ///
255    /// This concludes once the last state update has been received and processed.
256    #[instrument(
257        name = "SparseTrieCacheTask::run",
258        level = "debug",
259        target = "engine::tree::payload_processor::sparse_trie",
260        skip_all
261    )]
262    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
263        let now = Instant::now();
264
265        let mut total_idle_time = std::time::Duration::ZERO;
266        let mut idle_start = Instant::now();
267
268        loop {
269            let mut t = Instant::now();
270            crossbeam_channel::select_biased! {
271                recv(self.updates) -> message => {
272                    let wake = Instant::now();
273
274                    let update = match message {
275                        Ok(m) => m,
276                        Err(_) => {
277                            return Err(ParallelStateRootError::Other(
278                                "updates channel disconnected before state root calculation".to_string(),
279                            ))
280                        }
281                    };
282
283                    total_idle_time += wake.duration_since(idle_start);
284                    self.metrics
285                        .sparse_trie_channel_wait_duration_histogram
286                        .record(wake.duration_since(t));
287
288                    self.on_message(update);
289                    self.pending_updates += 1;
290                }
291                recv(self.proof_result_rx) -> message => {
292                    let phase_end = Instant::now();
293                    total_idle_time += phase_end.duration_since(idle_start);
294                    self.metrics
295                        .sparse_trie_channel_wait_duration_histogram
296                        .record(phase_end.duration_since(t));
297                    t = phase_end;
298
299                    let Ok(result) = message else {
300                        unreachable!("we own the sender half")
301                    };
302
303                    let mut result = result.result?;
304                    while let Ok(next) = self.proof_result_rx.try_recv() {
305                        let res = next.result?;
306                        result.extend(res);
307                    }
308
309                    let phase_end = Instant::now();
310                    self.metrics
311                        .sparse_trie_proof_coalesce_duration_histogram
312                        .record(phase_end.duration_since(t));
313                    t = phase_end;
314
315                    self.on_proof_result(result)?;
316                    self.metrics
317                        .sparse_trie_reveal_multiproof_duration_histogram
318                        .record(t.elapsed());
319                },
320            }
321
322            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
323                // If we don't have any pending messages, we can spend some time on computing
324                // storage roots and promoting account updates.
325                self.dispatch_pending_targets();
326                t = Instant::now();
327                self.process_new_updates()?;
328                self.promote_pending_account_updates()?;
329                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
330
331                if self.finished_state_updates &&
332                    self.account_updates.is_empty() &&
333                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
334                {
335                    break;
336                }
337
338                self.dispatch_pending_targets();
339
340                // If there's still no pending updates spend some time pre-computing the account
341                // trie upper hashes
342                if self.proof_result_rx.is_empty() {
343                    self.trie.calculate_subtries();
344                }
345            } else if self.updates.is_empty() {
346                // If we don't have any pending updates, apply them to the trie,
347                t = Instant::now();
348                self.process_new_updates()?;
349                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
350                self.dispatch_pending_targets();
351            } else if self.pending_targets.len() > self.chunk_size {
352                // Make sure to dispatch targets if we've accumulated a lot of them.
353                self.dispatch_pending_targets();
354            }
355
356            idle_start = Instant::now();
357        }
358
359        self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
360
361        debug!(target: "engine::root", "All proofs processed, ending calculation");
362
363        let start = Instant::now();
364        let (state_root, trie_updates) = match self.trie.root_with_updates() {
365            Ok(result) => result,
366            Err(err)
367                if matches!(
368                    err.kind(),
369                    SparseStateTrieErrorKind::Sparse(SparseTrieErrorKind::Blind)
370                ) =>
371            {
372                // A still-blind account trie means this block never changed state, so preserve
373                // the cached parent root instead of fetching and revealing
374                // the unchanged root node.
375                (self.parent_state_root, TrieUpdates::default())
376            }
377            Err(err) => {
378                return Err(ParallelStateRootError::Other(format!(
379                    "could not calculate state root: {err:?}"
380                )))
381            }
382        };
383
384        #[cfg(feature = "trie-debug")]
385        let debug_recorders = self.trie.take_debug_recorders();
386
387        let end = Instant::now();
388        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
389        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
390
391        self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
392        self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
393        self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
394        self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
395        self.account_cache_hits = 0;
396        self.account_cache_misses = 0;
397        self.storage_cache_hits = 0;
398        self.storage_cache_misses = 0;
399
400        Ok(StateRootComputeOutcome {
401            state_root,
402            trie_updates: Arc::new(trie_updates),
403            #[cfg(feature = "trie-debug")]
404            debug_recorders,
405        })
406    }
407
408    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
409    fn on_message(&mut self, message: SparseTrieTaskMessage) {
410        match message {
411            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
412            SparseTrieTaskMessage::HashedState(hashed_state) => {
413                self.on_hashed_state_update(hashed_state)
414            }
415            SparseTrieTaskMessage::FinishedStateUpdates => {
416                let _ = self
417                    .final_hashed_state_tx
418                    .take()
419                    .unwrap()
420                    .send(core::mem::take(&mut self.final_hashed_state));
421                self.finished_state_updates = true
422            }
423        }
424    }
425
426    #[instrument(
427        level = "trace",
428        target = "engine::tree::payload_processor::sparse_trie",
429        skip_all
430    )]
431    fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
432        for target in targets.account_targets {
433            // Only touch accounts that are not yet present in the updates set.
434            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
435        }
436
437        for (address, slots) in targets.storage_targets {
438            if !slots.is_empty() {
439                // Look up outer map once per address instead of once per slot.
440                let new_updates = self.new_storage_updates.entry(address).or_default();
441                for slot in slots {
442                    // Only touch storages that are not yet present in the updates set.
443                    new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
444                }
445            }
446
447            // Touch corresponding account leaf to make sure its revealed in accounts trie for
448            // storage root update.
449            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
450        }
451    }
452
453    /// Processes a hashed state update and encodes all state changes as trie updates.
454    #[instrument(
455        level = "trace",
456        target = "engine::tree::payload_processor::sparse_trie",
457        skip_all
458    )]
459    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
460        for (&address, storage) in &hashed_state_update.storages {
461            if !storage.storage.is_empty() {
462                // Look up outer maps once per address instead of once per slot.
463                let new_updates = self.new_storage_updates.entry(address).or_default();
464                let mut existing_updates = self.storage_updates.get_mut(&address);
465
466                for (&slot, &value) in &storage.storage {
467                    self.trie.record_slot_touch(address, slot);
468
469                    let encoded = if value.is_zero() {
470                        Vec::new()
471                    } else {
472                        alloy_rlp::encode_fixed_size(&value).to_vec()
473                    };
474                    new_updates.insert(slot, LeafUpdate::Changed(encoded));
475
476                    // Remove an existing storage update if it exists.
477                    if let Some(ref mut existing) = existing_updates {
478                        existing.remove(&slot);
479                    }
480                }
481            }
482
483            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
484            // trie for storage root update.
485            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
486
487            // Make sure account is tracked in `pending_account_updates` so that once storage root
488            // is computed, it will be updated in the accounts trie.
489            self.pending_account_updates.entry(address).or_insert(None);
490        }
491
492        for (&address, &account) in &hashed_state_update.accounts {
493            self.trie.record_account_touch(address);
494
495            // Track account as touched.
496            //
497            // This might overwrite an existing update, which is fine, because storage root from it
498            // is already tracked in the trie and can be easily fetched again.
499            self.new_account_updates.insert(address, LeafUpdate::Touched);
500
501            // Track account in `pending_account_updates` so that once storage root is computed,
502            // it will be updated in the accounts trie.
503            self.pending_account_updates.insert(address, Some(account));
504        }
505
506        self.final_hashed_state.extend(hashed_state_update);
507    }
508
509    fn on_proof_result(
510        &mut self,
511        result: DecodedMultiProofV2,
512    ) -> Result<(), ParallelStateRootError> {
513        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
514            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
515        })
516    }
517
518    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
519        if self.pending_updates == 0 {
520            return Ok(());
521        }
522
523        let _span = debug_span!("process_new_updates").entered();
524        self.pending_updates = 0;
525
526        // Firstly apply all new storage and account updates to the tries.
527        self.process_leaf_updates(true)?;
528
529        for (address, mut new) in self.new_storage_updates.drain() {
530            match self.storage_updates.entry(address) {
531                Entry::Vacant(entry) => {
532                    entry.insert(new); // insert the whole map at once, no per-slot loop
533                }
534                Entry::Occupied(mut entry) => {
535                    let updates = entry.get_mut();
536                    for (slot, new) in new.drain() {
537                        match updates.entry(slot) {
538                            Entry::Occupied(mut slot_entry) => {
539                                if new.is_changed() {
540                                    slot_entry.insert(new);
541                                }
542                            }
543                            Entry::Vacant(slot_entry) => {
544                                slot_entry.insert(new);
545                            }
546                        }
547                    }
548                }
549            }
550        }
551
552        for (address, new) in self.new_account_updates.drain() {
553            match self.account_updates.entry(address) {
554                Entry::Occupied(mut entry) => {
555                    if new.is_changed() {
556                        entry.insert(new);
557                    }
558                }
559                Entry::Vacant(entry) => {
560                    entry.insert(new);
561                }
562            }
563        }
564
565        Ok(())
566    }
567
568    /// Applies all account and storage leaf updates to corresponding tries and collects any new
569    /// multiproof targets.
570    #[instrument(
571        level = "trace",
572        target = "engine::tree::payload_processor::sparse_trie",
573        skip_all
574    )]
575    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
576        let storage_updates =
577            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
578
579        // Process all storage updates, skipping tries with no pending updates.
580        let span = trace_span!("process_storage_leaf_updates").entered();
581        for (address, updates) in storage_updates {
582            if updates.is_empty() {
583                continue;
584            }
585            let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
586
587            let trie = self.trie.get_or_create_storage_trie_mut(*address);
588            let fetched = self.fetched_storage_targets.entry(*address).or_default();
589            let mut targets = Vec::new();
590
591            let updates_len_before = updates.len();
592            trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
593                Entry::Occupied(mut entry) => {
594                    if min_len < *entry.get() {
595                        entry.insert(min_len);
596                        targets.push(ProofV2Target::new(path).with_min_len(min_len));
597                    }
598                }
599                Entry::Vacant(entry) => {
600                    entry.insert(min_len);
601                    targets.push(ProofV2Target::new(path).with_min_len(min_len));
602                }
603            })?;
604            let updates_len_after = updates.len();
605            self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
606            self.storage_cache_misses += updates_len_after as u64;
607
608            if !targets.is_empty() {
609                self.pending_targets.extend_storage_targets(address, targets);
610            }
611        }
612
613        drop(span);
614
615        // Process account trie updates and fill the account targets.
616        self.process_account_leaf_updates(new)?;
617
618        Ok(())
619    }
620
621    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
622    ///
623    /// Returns whether any updates were drained (applied to the trie).
624    #[instrument(
625        level = "trace",
626        target = "engine::tree::payload_processor::sparse_trie",
627        skip_all
628    )]
629    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
630        let account_updates =
631            if new { &mut self.new_account_updates } else { &mut self.account_updates };
632
633        let updates_len_before = account_updates.len();
634
635        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
636            match self.fetched_account_targets.entry(target) {
637                Entry::Occupied(mut entry) => {
638                    if min_len < *entry.get() {
639                        entry.insert(min_len);
640                        self.pending_targets
641                            .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
642                    }
643                }
644                Entry::Vacant(entry) => {
645                    entry.insert(min_len);
646                    self.pending_targets
647                        .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
648                }
649            }
650        })?;
651
652        let updates_len_after = account_updates.len();
653        self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
654        self.account_cache_misses += updates_len_after as u64;
655
656        Ok(updates_len_after < updates_len_before)
657    }
658
659    /// Computes storage roots for accounts whose storage updates are fully drained.
660    ///
661    /// For each storage trie T that:
662    /// 1. was modified in the current block,
663    /// 2. all the storage updates are fully drained,
664    /// 3. but the storage root hasn't been updated yet,
665    ///
666    /// we trigger state root computation on a rayon pool.
667    fn compute_drained_storage_roots(&mut self) {
668        let addresses_to_compute_roots: Vec<_> = self
669            .storage_updates
670            .iter()
671            .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
672            .collect();
673
674        struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
675        // SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
676        // documented at the use site below.
677        unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
678
679        let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
680            Vec::with_capacity(addresses_to_compute_roots.len());
681        for address in addresses_to_compute_roots {
682            if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
683                !trie.is_root_cached()
684            {
685                tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
686            }
687        }
688
689        if tries_to_compute_roots.is_empty() {
690            return;
691        }
692
693        let parent_span =
694            debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
695        tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
696            let span = if tracing::enabled!(tracing::Level::TRACE) {
697                debug_span!(
698                    target: "engine::tree::payload_processor::sparse_trie",
699                    parent: &parent_span,
700                    "storage_root",
701                    ?address
702                )
703            } else {
704                debug_span!(
705                    target: "engine::tree::payload_processor::sparse_trie",
706                    parent: &parent_span,
707                    "storage_root",
708                )
709            };
710            let _enter = span.entered();
711            // SAFETY:
712            // - pointers are created from `storage_tries_mut().get_mut(address)` above;
713            // - `addresses_to_compute_roots` comes from map iteration, so addresses are unique;
714            // - we do not insert/remove entries between pointer collection and use, so pointers
715            //   stay valid and map reallocation cannot occur;
716            // - each pointer is consumed by at most one rayon task, so no aliasing mutable access.
717            unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
718        });
719    }
720
721    /// Iterates through all storage tries for which all updates were processed, computes their
722    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
723    /// for accounts trie.
724    #[instrument(
725        level = "trace",
726        target = "engine::tree::payload_processor::sparse_trie",
727        skip_all
728    )]
729    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
730        self.process_leaf_updates(false)?;
731
732        if self.pending_account_updates.is_empty() {
733            return Ok(());
734        }
735
736        self.compute_drained_storage_roots();
737
738        loop {
739            let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
740            // Now handle pending account updates that can be upgraded to a proper update.
741            let account_rlp_buf = &mut self.account_rlp_buf;
742            let mut num_promoted = 0;
743            self.pending_account_updates.retain(|addr, account| {
744                if let Some(updates) = self.storage_updates.get(addr) {
745                    if !updates.is_empty() {
746                        // If account has pending storage updates, it is still pending.
747                        return true;
748                    } else if let Some(account) = account.take() {
749                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
750                        let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
751                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
752                        num_promoted += 1;
753                        return false;
754                    }
755                }
756
757                // Get the current account state either from the trie or from latest account update.
758                let trie_account = match self.account_updates.get(addr) {
759                    Some(LeafUpdate::Changed(encoded)) => {
760                        Some(encoded).filter(|encoded| !encoded.is_empty())
761                    }
762                    // Needs to be revealed first
763                    Some(LeafUpdate::Touched) => return true,
764                    None => self.trie.get_account_value(addr),
765                };
766
767                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
768
769                let (account, storage_root) = if let Some(account) = account.take() {
770                    // If account is Some(_) here it means it didn't have any storage updates
771                    // and we can fetch the storage root directly from the account trie.
772                    //
773                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
774                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
775
776                    (account, storage_root)
777                } else {
778                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
779                };
780
781                let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
782                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
783                num_promoted += 1;
784
785                false
786            });
787            span.record("promoted", num_promoted);
788            drop(span);
789
790            // Only exit when no new updates are processed.
791            //
792            // We need to keep iterating if any updates are being drained because that might
793            // indicate that more pending account updates can be promoted.
794            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
795                break
796            }
797        }
798
799        Ok(())
800    }
801
802    fn dispatch_pending_targets(&mut self) {
803        if self.pending_targets.is_empty() {
804            return;
805        }
806
807        let _span = trace_span!("dispatch_pending_targets").entered();
808        let (targets, chunking_length) = self.pending_targets.take();
809        dispatch_with_chunking(
810            targets,
811            chunking_length,
812            self.chunk_size,
813            self.max_targets_for_chunking,
814            self.proof_worker_handle.has_multiple_idle_account_workers(),
815            self.proof_worker_handle.has_multiple_idle_storage_workers(),
816            MultiProofTargetsV2::chunks,
817            |proof_targets| {
818                if let Err(e) =
819                    self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
820                        targets: proof_targets,
821                        proof_result_sender: ProofResultContext::new(
822                            self.proof_result_tx.clone(),
823                            HashedPostState::default(),
824                            Instant::now(),
825                        ),
826                    })
827                {
828                    error!("failed to dispatch account multiproof: {e:?}");
829                }
830            },
831        );
832    }
833}
834
835/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
836fn encode_account_leaf_value(
837    account: Option<Account>,
838    storage_root: B256,
839    account_rlp_buf: &mut Vec<u8>,
840) -> Vec<u8> {
841    if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
842        return Vec::new();
843    }
844
845    account_rlp_buf.clear();
846    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
847    account_rlp_buf.clone()
848}
849
850/// Pending proof targets queued for dispatch to proof workers, along with their count.
851#[derive(Default)]
852struct PendingTargets {
853    /// The proof targets.
854    targets: MultiProofTargetsV2,
855    /// Number of account + storage proof targets currently queued.
856    len: usize,
857}
858
859impl PendingTargets {
860    /// Returns the number of pending targets.
861    const fn len(&self) -> usize {
862        self.len
863    }
864
865    /// Returns `true` if there are no pending targets.
866    const fn is_empty(&self) -> bool {
867        self.len == 0
868    }
869
870    /// Takes the pending targets, replacing with empty defaults.
871    fn take(&mut self) -> (MultiProofTargetsV2, usize) {
872        (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
873    }
874
875    /// Adds a target to the account targets.
876    fn push_account_target(&mut self, target: ProofV2Target) {
877        self.targets.account_targets.push(target);
878        self.len += 1;
879    }
880
881    /// Extends storage targets for the given address.
882    fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
883        self.len += targets.len();
884        self.targets.storage_targets.entry(*address).or_default().extend(targets);
885    }
886}
887
888/// Message type for the sparse trie task.
889enum SparseTrieTaskMessage {
890    /// A hashed state update ready to be processed.
891    HashedState(HashedPostState),
892    /// Prefetch proof targets (passed through directly).
893    PrefetchProofs(MultiProofTargetsV2),
894    /// Signals that all state updates have been received.
895    FinishedStateUpdates,
896}
897
898#[cfg(test)]
899mod tests {
900    use super::*;
901    use alloy_primitives::{keccak256, Address, B256, U256};
902    use reth_provider::{
903        providers::{OverlayBuilder, OverlayStateProviderFactory},
904        test_utils::create_test_provider_factory,
905        ChainSpecProvider,
906    };
907    use reth_trie_db::ChangesetCache;
908    use reth_trie_parallel::proof_task::ProofTaskCtx;
909    use reth_trie_sparse::ArenaParallelSparseTrie;
910
911    #[test]
912    fn test_run_hashing_task_hashed_state_update_forwards() {
913        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
914        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
915
916        let address = keccak256(Address::random());
917        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
918        let value = U256::from(999);
919
920        let mut hashed_state = HashedPostState::default();
921        hashed_state.accounts.insert(
922            address,
923            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
924        );
925        let mut storage = reth_trie::HashedStorage::new(false);
926        storage.storage.insert(slot, value);
927        hashed_state.storages.insert(address, storage);
928
929        let expected_state = hashed_state.clone();
930
931        let handle = std::thread::spawn(move || {
932            SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
933                updates_rx,
934                hashed_state_tx,
935                MultiProofTaskMetrics::default(),
936            );
937        });
938
939        updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
940        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
941        drop(updates_tx);
942
943        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
944            panic!("expected HashedState message");
945        };
946
947        let account = received.accounts.get(&address).unwrap().unwrap();
948        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
949        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
950
951        let storage = received.storages.get(&address).unwrap();
952        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
953
954        let second = hashed_state_rx.recv().unwrap();
955        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
956
957        assert!(hashed_state_rx.recv().is_err());
958        handle.join().unwrap();
959    }
960
961    #[test]
962    fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
963        let mut account_rlp_buf = vec![0xAB];
964        let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
965
966        assert!(encoded.is_empty());
967        // Early return should not touch the caller's buffer.
968        assert_eq!(account_rlp_buf, vec![0xAB]);
969    }
970
971    #[test]
972    fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
973        let storage_root = B256::from([0x99; 32]);
974        let account = Some(Account {
975            nonce: 7,
976            balance: U256::from(42),
977            bytecode_hash: Some(B256::from([0xAA; 32])),
978        });
979        let mut account_rlp_buf = vec![0x00, 0x01];
980
981        let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
982        let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
983
984        assert_eq!(decoded.nonce, 7);
985        assert_eq!(decoded.balance, U256::from(42));
986        assert_eq!(decoded.storage_root, storage_root);
987        assert_eq!(account_rlp_buf, encoded);
988    }
989
990    #[test]
991    fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
992        let runtime = reth_tasks::Runtime::test();
993        let provider_factory = create_test_provider_factory();
994        let anchor_hash = provider_factory.chain_spec().genesis_hash();
995        let overlay_factory = OverlayStateProviderFactory::new(
996            provider_factory,
997            OverlayBuilder::<reth_chain_state::EthPrimitives>::new(
998                anchor_hash,
999                ChangesetCache::new(),
1000            ),
1001        );
1002        let proof_worker_handle =
1003            ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);
1004
1005        let default_trie = RevealableSparseTrie::blind_from(ArenaParallelSparseTrie::default());
1006        let trie = SparseStateTrie::default()
1007            .with_accounts_trie(default_trie.clone())
1008            .with_default_storage_trie(default_trie)
1009            .with_updates(true);
1010
1011        let parent_state_root = B256::from([0x55; 32]);
1012        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1013        let mut task = SparseTrieCacheTask::new_with_trie(
1014            &runtime,
1015            updates_rx,
1016            std::sync::mpsc::channel().0,
1017            proof_worker_handle,
1018            MultiProofTaskMetrics::default(),
1019            trie,
1020            parent_state_root,
1021            1,
1022        );
1023
1024        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
1025        drop(updates_tx);
1026
1027        let outcome = task.run().expect("state root computation should succeed");
1028
1029        assert_eq!(outcome.state_root, parent_state_root);
1030        assert!(outcome.trie_updates.is_empty());
1031        assert!(task.trie.state_trie_ref().is_none(), "blind trie should not be revealed");
1032    }
1033}