Skip to main content

reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use std::sync::Arc;
4
5use crate::tree::{
6    multiproof::{
7        dispatch_with_chunking, evm_state_to_hashed_post_state, StateRootComputeOutcome,
8        StateRootMessage, DEFAULT_MAX_TARGETS_FOR_CHUNKING,
9    },
10    payload_processor::multiproof::MultiProofTaskMetrics,
11};
12use alloy_primitives::B256;
13use alloy_rlp::{Decodable, Encodable};
14use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
15use rayon::iter::{IntoParallelIterator, ParallelIterator};
16use reth_primitives_traits::{Account, FastInstant as Instant};
17use reth_tasks::Runtime;
18use reth_trie::{
19    updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, TrieAccount, EMPTY_ROOT_HASH,
20    TRIE_ACCOUNT_RLP_MAX_SIZE,
21};
22use reth_trie_common::{MultiProofTargetsV2, ProofV2Target};
23use reth_trie_parallel::{
24    proof_task::{
25        AccountMultiproofInput, ProofResultContext, ProofResultMessage, ProofWorkerHandle,
26    },
27    root::ParallelStateRootError,
28};
29use reth_trie_sparse::{
30    errors::{SparseStateTrieErrorKind, SparseTrieErrorKind, SparseTrieResult},
31    ConfigurableSparseTrie, DeferredDrops, LeafUpdate, RevealableSparseTrie, SparseStateTrie,
32    SparseTrie,
33};
34use revm_primitives::{hash_map::Entry, B256Map};
35use tracing::{debug, debug_span, error, instrument, trace_span};
36
37/// Maximum number of pending/prewarm updates that we accumulate in memory before actually applying.
38const MAX_PENDING_UPDATES: usize = 100;
39
40/// Sparse trie task implementation that uses in-memory sparse trie data to schedule proof fetching.
41pub(super) struct SparseTrieCacheTask<A = ConfigurableSparseTrie, S = ConfigurableSparseTrie> {
42    /// Sender for proof results.
43    proof_result_tx: CrossbeamSender<ProofResultMessage>,
44    /// Receiver for proof results directly from workers.
45    proof_result_rx: CrossbeamReceiver<ProofResultMessage>,
46    /// Receives updates from execution and prewarming.
47    updates: CrossbeamReceiver<SparseTrieTaskMessage>,
48    /// Sender half for the channel to send final hashed state to.
49    final_hashed_state_tx: Option<std::sync::mpsc::Sender<HashedPostState>>,
50    /// `SparseStateTrie` used for computing the state root.
51    trie: SparseStateTrie<A, S>,
52    /// The parent block's state root.
53    parent_state_root: B256,
54    /// Handle to the proof worker pools (storage and account).
55    proof_worker_handle: ProofWorkerHandle,
56
57    /// The size of proof targets chunk to spawn in one calculation.
58    /// If None, chunking is disabled and all targets are processed in a single proof.
59    chunk_size: usize,
60    /// If this number is exceeded and chunking is enabled, then this will override whether or not
61    /// there are any active workers and force chunking across workers. This is to prevent tasks
62    /// which are very long from hitting a single worker.
63    max_targets_for_chunking: usize,
64
65    /// Account trie updates.
66    account_updates: B256Map<LeafUpdate>,
67    /// Storage trie updates. hashed address -> slot -> update.
68    storage_updates: B256Map<B256Map<LeafUpdate>>,
69
70    /// Account updates that are buffered but were not yet applied to the trie.
71    new_account_updates: B256Map<LeafUpdate>,
72    /// Storage updates that are buffered but were not yet applied to the trie.
73    new_storage_updates: B256Map<B256Map<LeafUpdate>>,
74    /// Account updates that are blocked by storage root calculation or account reveal.
75    ///
76    /// Those are being moved into `account_updates` once storage roots
77    /// are revealed and/or calculated.
78    ///
79    /// Invariant: for each entry in `pending_account_updates` account must either be already
80    /// revealed in the trie or have an entry in `account_updates`.
81    ///
82    /// Values can be either of:
83    ///   - None: account had a storage update and is awaiting storage root calculation and/or
84    ///     account node reveal to complete.
85    ///   - Some(_): account was changed/destroyed and is awaiting storage root calculation/reveal
86    ///     to complete.
87    pending_account_updates: B256Map<Option<Option<Account>>>,
88    /// Cache of account proof targets that were already fetched/requested from the proof workers.
89    /// account -> lowest `min_len` requested.
90    fetched_account_targets: B256Map<u8>,
91    /// Cache of storage proof targets that have already been fetched/requested from the proof
92    /// workers. account -> slot -> lowest `min_len` requested.
93    fetched_storage_targets: B256Map<B256Map<u8>>,
94    /// Reusable buffer for RLP encoding of accounts.
95    account_rlp_buf: Vec<u8>,
96    /// Whether the last state update has been received.
97    finished_state_updates: bool,
98    /// Accumulated account leaf update cache hits.
99    account_cache_hits: u64,
100    /// Accumulated account leaf update cache misses.
101    account_cache_misses: u64,
102    /// Accumulated storage leaf update cache hits.
103    storage_cache_hits: u64,
104    /// Accumulated storage leaf update cache misses.
105    storage_cache_misses: u64,
106    /// Pending proof targets queued for dispatch to proof workers.
107    pending_targets: PendingTargets,
108    /// Number of pending execution/prewarming updates received but not yet passed to
109    /// `update_leaves`.
110    pending_updates: usize,
111    /// Combined final hashed state.
112    ///
113    /// Sparse trie task observes and hashes all state updates, allowing it to cheaply construct a
114    /// final [`HashedPostState`] and share it with main engine thread without requiring any extra
115    /// hashing work.
116    final_hashed_state: HashedPostState,
117
118    /// Metrics for the sparse trie.
119    metrics: MultiProofTaskMetrics,
120}
121
122impl<A, S> SparseTrieCacheTask<A, S>
123where
124    A: SparseTrie + Default,
125    S: SparseTrie + Default + Clone,
126{
127    /// Creates a new sparse trie, pre-populating with an existing [`SparseStateTrie`].
128    #[expect(clippy::too_many_arguments)]
129    pub(super) fn new_with_trie(
130        executor: &Runtime,
131        updates: CrossbeamReceiver<StateRootMessage>,
132        final_hashed_state_tx: std::sync::mpsc::Sender<HashedPostState>,
133        proof_worker_handle: ProofWorkerHandle,
134        metrics: MultiProofTaskMetrics,
135        trie: SparseStateTrie<A, S>,
136        parent_state_root: B256,
137        chunk_size: usize,
138    ) -> Self {
139        let (proof_result_tx, proof_result_rx) = crossbeam_channel::unbounded();
140        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
141
142        let parent_span = tracing::Span::current();
143        let hashing_metrics = metrics.clone();
144        executor.spawn_blocking_named("trie-hashing", move || {
145            let _span = trace_span!(parent: parent_span, "run_hashing_task").entered();
146            Self::run_hashing_task(updates, hashed_state_tx, hashing_metrics)
147        });
148
149        Self {
150            proof_result_tx,
151            proof_result_rx,
152            updates: hashed_state_rx,
153            proof_worker_handle,
154            final_hashed_state_tx: Some(final_hashed_state_tx),
155            trie,
156            parent_state_root,
157            chunk_size,
158            max_targets_for_chunking: DEFAULT_MAX_TARGETS_FOR_CHUNKING,
159            account_updates: Default::default(),
160            storage_updates: Default::default(),
161            new_account_updates: Default::default(),
162            new_storage_updates: Default::default(),
163            pending_account_updates: Default::default(),
164            fetched_account_targets: Default::default(),
165            fetched_storage_targets: Default::default(),
166            account_rlp_buf: Vec::with_capacity(TRIE_ACCOUNT_RLP_MAX_SIZE),
167            finished_state_updates: Default::default(),
168            account_cache_hits: 0,
169            account_cache_misses: 0,
170            storage_cache_hits: 0,
171            storage_cache_misses: 0,
172            pending_targets: Default::default(),
173            pending_updates: Default::default(),
174            final_hashed_state: Default::default(),
175            metrics,
176        }
177    }
178
179    /// Runs the hashing task that drains updates from the channel and converts them to
180    /// `HashedPostState` in parallel.
181    fn run_hashing_task(
182        updates: CrossbeamReceiver<StateRootMessage>,
183        hashed_state_tx: CrossbeamSender<SparseTrieTaskMessage>,
184        metrics: MultiProofTaskMetrics,
185    ) {
186        let mut total_idle_time = std::time::Duration::ZERO;
187        let mut idle_start = Instant::now();
188
189        while let Ok(message) = updates.recv() {
190            total_idle_time += idle_start.elapsed();
191
192            let msg = match message {
193                StateRootMessage::PrefetchProofs(targets) => {
194                    SparseTrieTaskMessage::PrefetchProofs(targets)
195                }
196                StateRootMessage::StateUpdate(state) => {
197                    let _span = trace_span!(target: "engine::tree::payload_processor::sparse_trie", "hashing_state_update", n = state.len()).entered();
198                    let hashed = evm_state_to_hashed_post_state(state);
199                    SparseTrieTaskMessage::HashedState(hashed)
200                }
201                StateRootMessage::FinishedStateUpdates => {
202                    SparseTrieTaskMessage::FinishedStateUpdates
203                }
204                StateRootMessage::BlockAccessList(_) => {
205                    idle_start = Instant::now();
206                    continue;
207                }
208                StateRootMessage::HashedStateUpdate(state) => {
209                    SparseTrieTaskMessage::HashedState(state)
210                }
211            };
212            if hashed_state_tx.send(msg).is_err() {
213                break;
214            }
215
216            idle_start = Instant::now();
217        }
218
219        metrics.hashing_task_idle_time_seconds.record(total_idle_time.as_secs_f64());
220    }
221
222    /// Prunes and shrinks the trie for reuse in the next payload built on top of this one.
223    ///
224    /// Should be called after the state root result has been sent.
225    ///
226    /// When `disable_pruning` is true, the trie is preserved without any node pruning,
227    /// storage trie eviction, or capacity shrinking, keeping the full cache intact for
228    /// benchmarking purposes.
229    pub(super) fn into_trie_for_reuse(
230        self,
231        max_hot_slots: usize,
232        max_hot_accounts: usize,
233        max_nodes_capacity: usize,
234        max_values_capacity: usize,
235        disable_pruning: bool,
236        updates: &TrieUpdates,
237    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
238        let Self { mut trie, .. } = self;
239        trie.commit_updates(updates);
240        if !disable_pruning {
241            trie.prune(max_hot_slots, max_hot_accounts);
242            trie.shrink_to(max_nodes_capacity, max_values_capacity);
243        }
244        let deferred = trie.take_deferred_drops();
245        (trie, deferred)
246    }
247
248    /// Clears and shrinks the trie, discarding all state.
249    ///
250    /// Use this when the payload was invalid or cancelled - we don't want to preserve
251    /// potentially invalid trie state, but we keep the allocations for reuse.
252    pub(super) fn into_cleared_trie(
253        self,
254        max_nodes_capacity: usize,
255        max_values_capacity: usize,
256    ) -> (SparseStateTrie<A, S>, DeferredDrops) {
257        let Self { mut trie, .. } = self;
258        trie.clear();
259        trie.shrink_to(max_nodes_capacity, max_values_capacity);
260        let deferred = trie.take_deferred_drops();
261        (trie, deferred)
262    }
263
264    /// Runs the sparse trie task to completion.
265    ///
266    /// This waits for new incoming [`SparseTrieTaskMessage`]s, applies updates
267    /// to the trie and schedules proof fetching when needed.
268    ///
269    /// This concludes once the last state update has been received and processed.
270    #[instrument(
271        name = "SparseTrieCacheTask::run",
272        level = "debug",
273        target = "engine::tree::payload_processor::sparse_trie",
274        skip_all
275    )]
276    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
277        let now = Instant::now();
278
279        let mut total_idle_time = std::time::Duration::ZERO;
280        let mut idle_start = Instant::now();
281
282        loop {
283            let mut t = Instant::now();
284            crossbeam_channel::select_biased! {
285                recv(self.updates) -> message => {
286                    let wake = Instant::now();
287
288                    let update = match message {
289                        Ok(m) => m,
290                        Err(_) => {
291                            return Err(ParallelStateRootError::Other(
292                                "updates channel disconnected before state root calculation".to_string(),
293                            ))
294                        }
295                    };
296
297                    total_idle_time += wake.duration_since(idle_start);
298                    self.metrics
299                        .sparse_trie_channel_wait_duration_histogram
300                        .record(wake.duration_since(t));
301
302                    self.on_message(update);
303                    self.pending_updates += 1;
304                }
305                recv(self.proof_result_rx) -> message => {
306                    let phase_end = Instant::now();
307                    total_idle_time += phase_end.duration_since(idle_start);
308                    self.metrics
309                        .sparse_trie_channel_wait_duration_histogram
310                        .record(phase_end.duration_since(t));
311                    t = phase_end;
312
313                    let Ok(result) = message else {
314                        unreachable!("we own the sender half")
315                    };
316
317                    let mut result = result.result?;
318                    while let Ok(next) = self.proof_result_rx.try_recv() {
319                        let res = next.result?;
320                        result.extend(res);
321                    }
322
323                    let phase_end = Instant::now();
324                    self.metrics
325                        .sparse_trie_proof_coalesce_duration_histogram
326                        .record(phase_end.duration_since(t));
327                    t = phase_end;
328
329                    self.on_proof_result(result)?;
330                    self.metrics
331                        .sparse_trie_reveal_multiproof_duration_histogram
332                        .record(t.elapsed());
333                },
334            }
335
336            if self.updates.is_empty() && self.proof_result_rx.is_empty() {
337                // If we don't have any pending messages, we can spend some time on computing
338                // storage roots and promoting account updates.
339                self.dispatch_pending_targets();
340                t = Instant::now();
341                self.process_new_updates()?;
342                self.promote_pending_account_updates()?;
343                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
344
345                if self.finished_state_updates &&
346                    self.account_updates.is_empty() &&
347                    self.storage_updates.iter().all(|(_, updates)| updates.is_empty())
348                {
349                    break;
350                }
351
352                self.dispatch_pending_targets();
353
354                // If there's still no pending updates spend some time pre-computing the account
355                // trie upper hashes
356                if self.proof_result_rx.is_empty() {
357                    self.trie.calculate_subtries();
358                }
359            } else if self.updates.is_empty() || self.pending_updates > MAX_PENDING_UPDATES {
360                // If we don't have any pending updates OR we've accumulated a lot already, apply
361                // them to the trie,
362                t = Instant::now();
363                self.process_new_updates()?;
364                self.metrics.sparse_trie_process_updates_duration_histogram.record(t.elapsed());
365                self.dispatch_pending_targets();
366            } else if self.pending_targets.len() > self.chunk_size {
367                // Make sure to dispatch targets if we've accumulated a lot of them.
368                self.dispatch_pending_targets();
369            }
370
371            idle_start = Instant::now();
372        }
373
374        self.metrics.sparse_trie_idle_time_seconds.record(total_idle_time.as_secs_f64());
375
376        debug!(target: "engine::root", "All proofs processed, ending calculation");
377
378        let start = Instant::now();
379        let (state_root, trie_updates) = match self.trie.root_with_updates() {
380            Ok(result) => result,
381            Err(err)
382                if matches!(
383                    err.kind(),
384                    SparseStateTrieErrorKind::Sparse(SparseTrieErrorKind::Blind)
385                ) =>
386            {
387                // A still-blind account trie means this block never changed state, so preserve
388                // the cached parent root instead of fetching and revealing
389                // the unchanged root node.
390                (self.parent_state_root, TrieUpdates::default())
391            }
392            Err(err) => {
393                return Err(ParallelStateRootError::Other(format!(
394                    "could not calculate state root: {err:?}"
395                )))
396            }
397        };
398
399        #[cfg(feature = "trie-debug")]
400        let debug_recorders = self.trie.take_debug_recorders();
401
402        let end = Instant::now();
403        self.metrics.sparse_trie_final_update_duration_histogram.record(end.duration_since(start));
404        self.metrics.sparse_trie_total_duration_histogram.record(end.duration_since(now));
405
406        self.metrics.sparse_trie_account_cache_hits.record(self.account_cache_hits as f64);
407        self.metrics.sparse_trie_account_cache_misses.record(self.account_cache_misses as f64);
408        self.metrics.sparse_trie_storage_cache_hits.record(self.storage_cache_hits as f64);
409        self.metrics.sparse_trie_storage_cache_misses.record(self.storage_cache_misses as f64);
410        self.account_cache_hits = 0;
411        self.account_cache_misses = 0;
412        self.storage_cache_hits = 0;
413        self.storage_cache_misses = 0;
414
415        Ok(StateRootComputeOutcome {
416            state_root,
417            trie_updates: Arc::new(trie_updates),
418            #[cfg(feature = "trie-debug")]
419            debug_recorders,
420        })
421    }
422
423    /// Processes a [`SparseTrieTaskMessage`] from the hashing task.
424    fn on_message(&mut self, message: SparseTrieTaskMessage) {
425        match message {
426            SparseTrieTaskMessage::PrefetchProofs(targets) => self.on_prewarm_targets(targets),
427            SparseTrieTaskMessage::HashedState(hashed_state) => {
428                self.on_hashed_state_update(hashed_state)
429            }
430            SparseTrieTaskMessage::FinishedStateUpdates => {
431                let _ = self
432                    .final_hashed_state_tx
433                    .take()
434                    .unwrap()
435                    .send(core::mem::take(&mut self.final_hashed_state));
436                self.finished_state_updates = true
437            }
438        }
439    }
440
441    #[instrument(
442        level = "trace",
443        target = "engine::tree::payload_processor::sparse_trie",
444        skip_all
445    )]
446    fn on_prewarm_targets(&mut self, targets: MultiProofTargetsV2) {
447        for target in targets.account_targets {
448            // Only touch accounts that are not yet present in the updates set.
449            self.new_account_updates.entry(target.key()).or_insert(LeafUpdate::Touched);
450        }
451
452        for (address, slots) in targets.storage_targets {
453            if !slots.is_empty() {
454                // Look up outer map once per address instead of once per slot.
455                let new_updates = self.new_storage_updates.entry(address).or_default();
456                for slot in slots {
457                    // Only touch storages that are not yet present in the updates set.
458                    new_updates.entry(slot.key()).or_insert(LeafUpdate::Touched);
459                }
460            }
461
462            // Touch corresponding account leaf to make sure its revealed in accounts trie for
463            // storage root update.
464            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
465        }
466    }
467
468    /// Processes a hashed state update and encodes all state changes as trie updates.
469    #[instrument(
470        level = "trace",
471        target = "engine::tree::payload_processor::sparse_trie",
472        skip_all
473    )]
474    fn on_hashed_state_update(&mut self, hashed_state_update: HashedPostState) {
475        for (&address, storage) in &hashed_state_update.storages {
476            if !storage.storage.is_empty() {
477                // Look up outer maps once per address instead of once per slot.
478                let new_updates = self.new_storage_updates.entry(address).or_default();
479                let mut existing_updates = self.storage_updates.get_mut(&address);
480
481                for (&slot, &value) in &storage.storage {
482                    self.trie.record_slot_touch(address, slot);
483
484                    let encoded = if value.is_zero() {
485                        Vec::new()
486                    } else {
487                        alloy_rlp::encode_fixed_size(&value).to_vec()
488                    };
489                    new_updates.insert(slot, LeafUpdate::Changed(encoded));
490
491                    // Remove an existing storage update if it exists.
492                    if let Some(ref mut existing) = existing_updates {
493                        existing.remove(&slot);
494                    }
495                }
496            }
497
498            // Make sure account is tracked in `account_updates` so that it is revealed in accounts
499            // trie for storage root update.
500            self.new_account_updates.entry(address).or_insert(LeafUpdate::Touched);
501
502            // Make sure account is tracked in `pending_account_updates` so that once storage root
503            // is computed, it will be updated in the accounts trie.
504            self.pending_account_updates.entry(address).or_insert(None);
505        }
506
507        for (&address, &account) in &hashed_state_update.accounts {
508            self.trie.record_account_touch(address);
509
510            // Track account as touched.
511            //
512            // This might overwrite an existing update, which is fine, because storage root from it
513            // is already tracked in the trie and can be easily fetched again.
514            self.new_account_updates.insert(address, LeafUpdate::Touched);
515
516            // Track account in `pending_account_updates` so that once storage root is computed,
517            // it will be updated in the accounts trie.
518            self.pending_account_updates.insert(address, Some(account));
519        }
520
521        self.final_hashed_state.extend(hashed_state_update);
522    }
523
524    fn on_proof_result(
525        &mut self,
526        result: DecodedMultiProofV2,
527    ) -> Result<(), ParallelStateRootError> {
528        self.trie.reveal_decoded_multiproof_v2(result).map_err(|e| {
529            ParallelStateRootError::Other(format!("could not reveal multiproof: {e:?}"))
530        })
531    }
532
533    fn process_new_updates(&mut self) -> SparseTrieResult<()> {
534        if self.pending_updates == 0 {
535            return Ok(());
536        }
537
538        let _span = debug_span!("process_new_updates").entered();
539        self.pending_updates = 0;
540
541        // Firstly apply all new storage and account updates to the tries.
542        self.process_leaf_updates(true)?;
543
544        for (address, mut new) in self.new_storage_updates.drain() {
545            match self.storage_updates.entry(address) {
546                Entry::Vacant(entry) => {
547                    entry.insert(new); // insert the whole map at once, no per-slot loop
548                }
549                Entry::Occupied(mut entry) => {
550                    let updates = entry.get_mut();
551                    for (slot, new) in new.drain() {
552                        match updates.entry(slot) {
553                            Entry::Occupied(mut slot_entry) => {
554                                if new.is_changed() {
555                                    slot_entry.insert(new);
556                                }
557                            }
558                            Entry::Vacant(slot_entry) => {
559                                slot_entry.insert(new);
560                            }
561                        }
562                    }
563                }
564            }
565        }
566
567        for (address, new) in self.new_account_updates.drain() {
568            match self.account_updates.entry(address) {
569                Entry::Occupied(mut entry) => {
570                    if new.is_changed() {
571                        entry.insert(new);
572                    }
573                }
574                Entry::Vacant(entry) => {
575                    entry.insert(new);
576                }
577            }
578        }
579
580        Ok(())
581    }
582
583    /// Applies all account and storage leaf updates to corresponding tries and collects any new
584    /// multiproof targets.
585    #[instrument(
586        level = "trace",
587        target = "engine::tree::payload_processor::sparse_trie",
588        skip_all
589    )]
590    fn process_leaf_updates(&mut self, new: bool) -> SparseTrieResult<()> {
591        let storage_updates =
592            if new { &mut self.new_storage_updates } else { &mut self.storage_updates };
593
594        // Process all storage updates, skipping tries with no pending updates.
595        let span = trace_span!("process_storage_leaf_updates").entered();
596        for (address, updates) in storage_updates {
597            if updates.is_empty() {
598                continue;
599            }
600            let _enter = trace_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage_trie_leaf_updates", a=%address).entered();
601
602            let trie = self.trie.get_or_create_storage_trie_mut(*address);
603            let fetched = self.fetched_storage_targets.entry(*address).or_default();
604            let mut targets = Vec::new();
605
606            let updates_len_before = updates.len();
607            trie.update_leaves(updates, |path, min_len| match fetched.entry(path) {
608                Entry::Occupied(mut entry) => {
609                    if min_len < *entry.get() {
610                        entry.insert(min_len);
611                        targets.push(ProofV2Target::new(path).with_min_len(min_len));
612                    }
613                }
614                Entry::Vacant(entry) => {
615                    entry.insert(min_len);
616                    targets.push(ProofV2Target::new(path).with_min_len(min_len));
617                }
618            })?;
619            let updates_len_after = updates.len();
620            self.storage_cache_hits += (updates_len_before - updates_len_after) as u64;
621            self.storage_cache_misses += updates_len_after as u64;
622
623            if !targets.is_empty() {
624                self.pending_targets.extend_storage_targets(address, targets);
625            }
626        }
627
628        drop(span);
629
630        // Process account trie updates and fill the account targets.
631        self.process_account_leaf_updates(new)?;
632
633        Ok(())
634    }
635
636    /// Invokes `update_leaves` for the accounts trie and collects any new targets.
637    ///
638    /// Returns whether any updates were drained (applied to the trie).
639    #[instrument(
640        level = "trace",
641        target = "engine::tree::payload_processor::sparse_trie",
642        skip_all
643    )]
644    fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
645        let account_updates =
646            if new { &mut self.new_account_updates } else { &mut self.account_updates };
647
648        let updates_len_before = account_updates.len();
649
650        self.trie.trie_mut().update_leaves(account_updates, |target, min_len| {
651            match self.fetched_account_targets.entry(target) {
652                Entry::Occupied(mut entry) => {
653                    if min_len < *entry.get() {
654                        entry.insert(min_len);
655                        self.pending_targets
656                            .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
657                    }
658                }
659                Entry::Vacant(entry) => {
660                    entry.insert(min_len);
661                    self.pending_targets
662                        .push_account_target(ProofV2Target::new(target).with_min_len(min_len));
663                }
664            }
665        })?;
666
667        let updates_len_after = account_updates.len();
668        self.account_cache_hits += (updates_len_before - updates_len_after) as u64;
669        self.account_cache_misses += updates_len_after as u64;
670
671        Ok(updates_len_after < updates_len_before)
672    }
673
674    /// Computes storage roots for accounts whose storage updates are fully drained.
675    ///
676    /// For each storage trie T that:
677    /// 1. was modified in the current block,
678    /// 2. all the storage updates are fully drained,
679    /// 3. but the storage root hasn't been updated yet,
680    ///
681    /// we trigger state root computation on a rayon pool.
682    fn compute_drained_storage_roots(&mut self) {
683        let addresses_to_compute_roots: Vec<_> = self
684            .storage_updates
685            .iter()
686            .filter_map(|(address, updates)| updates.is_empty().then_some(*address))
687            .collect();
688
689        struct SendStorageTriePtr<S>(*mut RevealableSparseTrie<S>);
690        // SAFETY: this wrapper only forwards the pointer across rayon; deref invariants are
691        // documented at the use site below.
692        unsafe impl<S: Send> Send for SendStorageTriePtr<S> {}
693
694        let mut tries_to_compute_roots: Vec<(B256, SendStorageTriePtr<S>)> =
695            Vec::with_capacity(addresses_to_compute_roots.len());
696        for address in addresses_to_compute_roots {
697            if let Some(trie) = self.trie.storage_tries_mut().get_mut(&address) &&
698                !trie.is_root_cached()
699            {
700                tries_to_compute_roots.push((address, SendStorageTriePtr(trie)));
701            }
702        }
703
704        if tries_to_compute_roots.is_empty() {
705            return;
706        }
707
708        let parent_span =
709            debug_span!("compute_drained_storage_roots", n = tries_to_compute_roots.len());
710        tries_to_compute_roots.into_par_iter().for_each(|(address, SendStorageTriePtr(trie))| {
711            let span = if tracing::enabled!(tracing::Level::TRACE) {
712                debug_span!(
713                    target: "engine::tree::payload_processor::sparse_trie",
714                    parent: &parent_span,
715                    "storage_root",
716                    ?address
717                )
718            } else {
719                debug_span!(
720                    target: "engine::tree::payload_processor::sparse_trie",
721                    parent: &parent_span,
722                    "storage_root",
723                )
724            };
725            let _enter = span.entered();
726            // SAFETY:
727            // - pointers are created from `storage_tries_mut().get_mut(address)` above;
728            // - `addresses_to_compute_roots` comes from map iteration, so addresses are unique;
729            // - we do not insert/remove entries between pointer collection and use, so pointers
730            //   stay valid and map reallocation cannot occur;
731            // - each pointer is consumed by at most one rayon task, so no aliasing mutable access.
732            unsafe { (*trie).root().expect("updates are drained, trie should be revealed by now") };
733        });
734    }
735
736    /// Iterates through all storage tries for which all updates were processed, computes their
737    /// storage roots, and promotes corresponding pending account updates into proper leaf updates
738    /// for accounts trie.
739    #[instrument(
740        level = "trace",
741        target = "engine::tree::payload_processor::sparse_trie",
742        skip_all
743    )]
744    fn promote_pending_account_updates(&mut self) -> SparseTrieResult<()> {
745        self.process_leaf_updates(false)?;
746
747        if self.pending_account_updates.is_empty() {
748            return Ok(());
749        }
750
751        self.compute_drained_storage_roots();
752
753        loop {
754            let span = trace_span!("promote_updates", promoted = tracing::field::Empty).entered();
755            // Now handle pending account updates that can be upgraded to a proper update.
756            let account_rlp_buf = &mut self.account_rlp_buf;
757            let mut num_promoted = 0;
758            self.pending_account_updates.retain(|addr, account| {
759                if let Some(updates) = self.storage_updates.get(addr) {
760                    if !updates.is_empty() {
761                        // If account has pending storage updates, it is still pending.
762                        return true;
763                    } else if let Some(account) = account.take() {
764                        let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
765                        let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
766                        self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
767                        num_promoted += 1;
768                        return false;
769                    }
770                }
771
772                // Get the current account state either from the trie or from latest account update.
773                let trie_account = match self.account_updates.get(addr) {
774                    Some(LeafUpdate::Changed(encoded)) => {
775                        Some(encoded).filter(|encoded| !encoded.is_empty())
776                    }
777                    // Needs to be revealed first
778                    Some(LeafUpdate::Touched) => return true,
779                    None => self.trie.get_account_value(addr),
780                };
781
782                let trie_account = trie_account.map(|value| TrieAccount::decode(&mut &value[..]).expect("invalid account RLP"));
783
784                let (account, storage_root) = if let Some(account) = account.take() {
785                    // If account is Some(_) here it means it didn't have any storage updates
786                    // and we can fetch the storage root directly from the account trie.
787                    //
788                    // If it did have storage updates, we would've had processed it above when iterating over storage tries.
789                    let storage_root = trie_account.map(|account| account.storage_root).unwrap_or(EMPTY_ROOT_HASH);
790
791                    (account, storage_root)
792                } else {
793                    (trie_account.map(Into::into), self.trie.storage_root(addr).expect("account had storage updates that were applied to its trie, storage root must be revealed by now"))
794                };
795
796                let encoded = encode_account_leaf_value(account, storage_root, account_rlp_buf);
797                self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
798                num_promoted += 1;
799
800                false
801            });
802            span.record("promoted", num_promoted);
803            drop(span);
804
805            // Only exit when no new updates are processed.
806            //
807            // We need to keep iterating if any updates are being drained because that might
808            // indicate that more pending account updates can be promoted.
809            if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
810                break
811            }
812        }
813
814        Ok(())
815    }
816
817    fn dispatch_pending_targets(&mut self) {
818        if self.pending_targets.is_empty() {
819            return;
820        }
821
822        let _span = trace_span!("dispatch_pending_targets").entered();
823        let (targets, chunking_length) = self.pending_targets.take();
824        dispatch_with_chunking(
825            targets,
826            chunking_length,
827            self.chunk_size,
828            self.max_targets_for_chunking,
829            self.proof_worker_handle.has_multiple_idle_account_workers(),
830            self.proof_worker_handle.has_multiple_idle_storage_workers(),
831            MultiProofTargetsV2::chunks,
832            |proof_targets| {
833                if let Err(e) =
834                    self.proof_worker_handle.dispatch_account_multiproof(AccountMultiproofInput {
835                        targets: proof_targets,
836                        proof_result_sender: ProofResultContext::new(
837                            self.proof_result_tx.clone(),
838                            HashedPostState::default(),
839                            Instant::now(),
840                        ),
841                    })
842                {
843                    error!("failed to dispatch account multiproof: {e:?}");
844                }
845            },
846        );
847    }
848}
849
850/// RLP-encodes the account as a [`TrieAccount`] leaf value, or returns empty for deletions.
851fn encode_account_leaf_value(
852    account: Option<Account>,
853    storage_root: B256,
854    account_rlp_buf: &mut Vec<u8>,
855) -> Vec<u8> {
856    if account.is_none_or(|account| account.is_empty()) && storage_root == EMPTY_ROOT_HASH {
857        return Vec::new();
858    }
859
860    account_rlp_buf.clear();
861    account.unwrap_or_default().into_trie_account(storage_root).encode(account_rlp_buf);
862    account_rlp_buf.clone()
863}
864
865/// Pending proof targets queued for dispatch to proof workers, along with their count.
866#[derive(Default)]
867struct PendingTargets {
868    /// The proof targets.
869    targets: MultiProofTargetsV2,
870    /// Number of account + storage proof targets currently queued.
871    len: usize,
872}
873
874impl PendingTargets {
875    /// Returns the number of pending targets.
876    const fn len(&self) -> usize {
877        self.len
878    }
879
880    /// Returns `true` if there are no pending targets.
881    const fn is_empty(&self) -> bool {
882        self.len == 0
883    }
884
885    /// Takes the pending targets, replacing with empty defaults.
886    fn take(&mut self) -> (MultiProofTargetsV2, usize) {
887        (std::mem::take(&mut self.targets), std::mem::take(&mut self.len))
888    }
889
890    /// Adds a target to the account targets.
891    fn push_account_target(&mut self, target: ProofV2Target) {
892        self.targets.account_targets.push(target);
893        self.len += 1;
894    }
895
896    /// Extends storage targets for the given address.
897    fn extend_storage_targets(&mut self, address: &B256, targets: Vec<ProofV2Target>) {
898        self.len += targets.len();
899        self.targets.storage_targets.entry(*address).or_default().extend(targets);
900    }
901}
902
903/// Message type for the sparse trie task.
904enum SparseTrieTaskMessage {
905    /// A hashed state update ready to be processed.
906    HashedState(HashedPostState),
907    /// Prefetch proof targets (passed through directly).
908    PrefetchProofs(MultiProofTargetsV2),
909    /// Signals that all state updates have been received.
910    FinishedStateUpdates,
911}
912
913#[cfg(test)]
914mod tests {
915    use super::*;
916    use alloy_primitives::{keccak256, Address, B256, U256};
917    use reth_provider::{
918        providers::{OverlayBuilder, OverlayStateProviderFactory},
919        test_utils::create_test_provider_factory,
920        ChainSpecProvider,
921    };
922    use reth_trie_db::ChangesetCache;
923    use reth_trie_parallel::proof_task::ProofTaskCtx;
924    use reth_trie_sparse::ArenaParallelSparseTrie;
925
926    #[test]
927    fn test_run_hashing_task_hashed_state_update_forwards() {
928        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
929        let (hashed_state_tx, hashed_state_rx) = crossbeam_channel::unbounded();
930
931        let address = keccak256(Address::random());
932        let slot = keccak256(U256::from(42).to_be_bytes::<32>());
933        let value = U256::from(999);
934
935        let mut hashed_state = HashedPostState::default();
936        hashed_state.accounts.insert(
937            address,
938            Some(Account { balance: U256::from(100), nonce: 1, bytecode_hash: None }),
939        );
940        let mut storage = reth_trie::HashedStorage::new(false);
941        storage.storage.insert(slot, value);
942        hashed_state.storages.insert(address, storage);
943
944        let expected_state = hashed_state.clone();
945
946        let handle = std::thread::spawn(move || {
947            SparseTrieCacheTask::<ArenaParallelSparseTrie, ArenaParallelSparseTrie>::run_hashing_task(
948                updates_rx,
949                hashed_state_tx,
950                MultiProofTaskMetrics::default(),
951            );
952        });
953
954        updates_tx.send(StateRootMessage::HashedStateUpdate(hashed_state)).unwrap();
955        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
956        drop(updates_tx);
957
958        let SparseTrieTaskMessage::HashedState(received) = hashed_state_rx.recv().unwrap() else {
959            panic!("expected HashedState message");
960        };
961
962        let account = received.accounts.get(&address).unwrap().unwrap();
963        assert_eq!(account.balance, expected_state.accounts[&address].unwrap().balance);
964        assert_eq!(account.nonce, expected_state.accounts[&address].unwrap().nonce);
965
966        let storage = received.storages.get(&address).unwrap();
967        assert_eq!(*storage.storage.get(&slot).unwrap(), value);
968
969        let second = hashed_state_rx.recv().unwrap();
970        assert!(matches!(second, SparseTrieTaskMessage::FinishedStateUpdates));
971
972        assert!(hashed_state_rx.recv().is_err());
973        handle.join().unwrap();
974    }
975
976    #[test]
977    fn test_encode_account_leaf_value_empty_account_and_empty_root_is_empty() {
978        let mut account_rlp_buf = vec![0xAB];
979        let encoded = encode_account_leaf_value(None, EMPTY_ROOT_HASH, &mut account_rlp_buf);
980
981        assert!(encoded.is_empty());
982        // Early return should not touch the caller's buffer.
983        assert_eq!(account_rlp_buf, vec![0xAB]);
984    }
985
986    #[test]
987    fn test_encode_account_leaf_value_non_empty_account_is_rlp() {
988        let storage_root = B256::from([0x99; 32]);
989        let account = Some(Account {
990            nonce: 7,
991            balance: U256::from(42),
992            bytecode_hash: Some(B256::from([0xAA; 32])),
993        });
994        let mut account_rlp_buf = vec![0x00, 0x01];
995
996        let encoded = encode_account_leaf_value(account, storage_root, &mut account_rlp_buf);
997        let decoded = TrieAccount::decode(&mut &encoded[..]).expect("valid account RLP");
998
999        assert_eq!(decoded.nonce, 7);
1000        assert_eq!(decoded.balance, U256::from(42));
1001        assert_eq!(decoded.storage_root, storage_root);
1002        assert_eq!(account_rlp_buf, encoded);
1003    }
1004
1005    #[test]
1006    fn run_returns_parent_root_without_revealing_blind_trie_when_no_state_updates() {
1007        let runtime = reth_tasks::Runtime::test();
1008        let provider_factory = create_test_provider_factory();
1009        let anchor_hash = provider_factory.chain_spec().genesis_hash();
1010        let overlay_factory = OverlayStateProviderFactory::new(
1011            provider_factory,
1012            OverlayBuilder::<reth_chain_state::EthPrimitives>::new(
1013                anchor_hash,
1014                ChangesetCache::new(),
1015            ),
1016        );
1017        let proof_worker_handle =
1018            ProofWorkerHandle::new(&runtime, ProofTaskCtx::new(overlay_factory), false);
1019
1020        let default_trie = RevealableSparseTrie::blind_from(ConfigurableSparseTrie::Arena(
1021            ArenaParallelSparseTrie::default(),
1022        ));
1023        let trie = SparseStateTrie::default()
1024            .with_accounts_trie(default_trie.clone())
1025            .with_default_storage_trie(default_trie)
1026            .with_updates(true);
1027
1028        let parent_state_root = B256::from([0x55; 32]);
1029        let (updates_tx, updates_rx) = crossbeam_channel::unbounded();
1030        let mut task = SparseTrieCacheTask::new_with_trie(
1031            &runtime,
1032            updates_rx,
1033            std::sync::mpsc::channel().0,
1034            proof_worker_handle,
1035            MultiProofTaskMetrics::default(),
1036            trie,
1037            parent_state_root,
1038            1,
1039        );
1040
1041        updates_tx.send(StateRootMessage::FinishedStateUpdates).unwrap();
1042        drop(updates_tx);
1043
1044        let outcome = task.run().expect("state root computation should succeed");
1045
1046        assert_eq!(outcome.state_root, parent_state_root);
1047        assert!(outcome.trie_updates.is_empty());
1048        assert!(task.trie.state_trie_ref().is_none(), "blind trie should not be revealed");
1049    }
1050}