Skip to main content

reth_trie_sparse/arena/
mod.rs

1mod branch_child_idx;
2mod cursor;
3mod nodes;
4
5use branch_child_idx::{BranchChildIdx, BranchChildIter};
6use cursor::{ArenaCursor, NextResult, SeekResult};
7use nodes::{
8    ArenaSparseNode, ArenaSparseNodeBranch, ArenaSparseNodeBranchChild, ArenaSparseNodeState,
9};
10
11use crate::{LeafLookup, LeafLookupError, LeafUpdate, SparseTrie, SparseTrieUpdates};
12use alloc::{borrow::Cow, boxed::Box, collections::VecDeque, vec::Vec};
13use alloy_primitives::{
14    keccak256,
15    map::{B256Map, HashMap, HashSet},
16    B256,
17};
18use alloy_trie::{BranchNodeCompact, TrieMask};
19use core::{cmp::Reverse, mem};
20use reth_execution_errors::SparseTrieResult;
21use reth_trie_common::{
22    BranchNodeMasks, BranchNodeRef, ExtensionNodeRef, LeafNodeRef, Nibbles, ProofTrieNodeV2,
23    RlpNode, TrieNodeV2, EMPTY_ROOT_HASH,
24};
25use slotmap::{DefaultKey, SlotMap};
26use smallvec::SmallVec;
27use tracing::{instrument, trace};
28
29#[cfg(feature = "trie-debug")]
30use crate::debug_recorder::{LeafUpdateRecord, ProofTrieNodeRecord, RecordedOp, TrieDebugRecorder};
31
32/// Alias for the slotmap key type used as node references throughout the arena trie.
33type Index = DefaultKey;
34/// Alias for the slotmap used as the node arena throughout the arena trie.
35type NodeArena = SlotMap<Index, ArenaSparseNode>;
36
37const TRACE_TARGET: &str = "trie::arena";
38
39/// The maximum path length (in nibbles) for nodes that live in the upper trie. Nodes at this
40/// depth or deeper belong to lower subtries.
41const UPPER_TRIE_MAX_DEPTH: usize = 2;
42
43/// Finds the sub-range of `sorted_keys[start..]` whose entries start with `prefix`.
44///
45/// Returns the half-open range `start_idx..end_idx` into `sorted_keys`. The returned
46/// `end_idx` can be used as the `start` for the next call when iterating prefixes in
47/// lexicographic order.
48fn prefix_range(
49    sorted_keys: &[Nibbles],
50    start: usize,
51    prefix: &Nibbles,
52) -> core::ops::Range<usize> {
53    // Advance past entries before `prefix`.
54    let begin = start + sorted_keys[start..].partition_point(|p| p < prefix);
55    // Find the end of entries that start with `prefix`.
56    let mut end = begin;
57    while end < sorted_keys.len() && sorted_keys[end].starts_with(prefix) {
58        end += 1;
59    }
60    begin..end
61}
62
63/// Returns the per-slot byte size used by `SlotMap<_, T>`. `SlotMap` wraps each value in a
64/// `Slot<T>` containing the value union + a 4-byte version field, with struct alignment.
65const fn slotmap_slot_size<T>() -> usize {
66    // Slot<T> = { u: SlotUnion<T>, version: u32 }
67    // SlotUnion<T> = union { value: ManuallyDrop<T>, next_free: u32 }
68    // size = max(size_of::<T>(), 4) + 4, rounded up to align_of::<T>() (or 4)
69    let union_size = if core::mem::size_of::<T>() > 4 { core::mem::size_of::<T>() } else { 4 };
70    let raw = union_size + 4;
71    let align = if core::mem::align_of::<T>() > 4 { core::mem::align_of::<T>() } else { 4 };
72    (raw + align - 1) & !(align - 1)
73}
74
75/// Compacts an arena by BFS-copying all reachable nodes into a fresh `SlotMap`, dropping
76/// unreachable (pruned) slots. Parents are stored before children for cache-friendly top-down
77/// traversal.
78fn compact_arena(arena: &mut NodeArena, root: &mut Index) {
79    let mut new_arena = SlotMap::with_capacity(arena.len());
80    let mut queue = VecDeque::new();
81
82    let root_node = arena.remove(*root).expect("root exists");
83    let new_root = new_arena.insert(root_node);
84    queue.push_back(new_root);
85
86    while let Some(new_idx) = queue.pop_front() {
87        // Invariant: any node popped from `queue` has been moved into `new_arena` but
88        // its Branch.children have not been rewritten yet — every Revealed(idx) here is
89        // still an old-arena index, and the child is still present in `arena` because
90        // only this parent's iteration can remove it (each child has exactly one parent).
91        let old_children: SmallVec<[(usize, Index); 16]> = match &new_arena[new_idx] {
92            ArenaSparseNode::Branch(b) => b
93                .children
94                .iter()
95                .enumerate()
96                .filter_map(|(i, c)| match c {
97                    ArenaSparseNodeBranchChild::Revealed(old_idx) => Some((i, *old_idx)),
98                    _ => None,
99                })
100                .collect(),
101            _ => continue,
102        };
103
104        for (child_pos, old_child_idx) in old_children {
105            let child_node = arena.remove(old_child_idx).expect("child exists");
106            let new_child_idx = new_arena.insert(child_node);
107            let ArenaSparseNode::Branch(b) = &mut new_arena[new_idx] else { unreachable!() };
108            b.children[child_pos] = ArenaSparseNodeBranchChild::Revealed(new_child_idx);
109            queue.push_back(new_child_idx);
110        }
111    }
112
113    debug_assert!(
114        arena.is_empty(),
115        "compact_arena: {} orphaned nodes remaining after BFS drain",
116        arena.len(),
117    );
118
119    *arena = new_arena;
120    *root = new_root;
121}
122
123/// Reusable buffers shared by both [`ArenaSparseSubtrie`] and [`ArenaParallelSparseTrie`].
124#[derive(Debug, Default, Clone)]
125struct ArenaTrieBuffers {
126    /// Reusable cursor for trie traversals.
127    cursor: ArenaCursor,
128    /// Trie updates built up directly during hashing and structural changes. `Some` when
129    /// tracking updates, `None` otherwise. Initialized alongside `updates` in `set_updates`.
130    updates: Option<SparseTrieUpdates>,
131    /// Reusable buffer for RLP encoding.
132    rlp_buf: Vec<u8>,
133    /// Reusable buffer for child `RlpNode`s during hashing.
134    rlp_node_buf: Vec<RlpNode>,
135}
136
137impl ArenaTrieBuffers {
138    fn clear(&mut self) {
139        if let Some(updates) = self.updates.as_mut() {
140            updates.clear();
141        }
142        self.rlp_buf.clear();
143        self.rlp_node_buf.clear();
144    }
145}
146
147/// A subtrie within the arena-based parallel sparse trie.
148///
149/// Each subtrie owns its own arena, allowing parallel mutations across subtries.
150#[derive(Debug, Clone)]
151struct ArenaSparseSubtrie {
152    /// The arena allocating nodes within this subtrie.
153    arena: NodeArena,
154    /// The root node of this subtrie.
155    root: Index,
156    /// The absolute path of this subtrie's root in the full trie.
157    path: Nibbles,
158    /// Reusable buffers for traversal, RLP encoding, and update actions.
159    buffers: ArenaTrieBuffers,
160    /// Reusable buffer for collecting required proofs during leaf updates.
161    /// Each entry is `(index, proof)` where `index` is the position of the target in the
162    /// `sorted_updates` slice passed to [`Self::update_leaves`].
163    required_proofs: Vec<(usize, ArenaRequiredProof)>,
164    /// Total number of revealed leaves in this subtrie.
165    num_leaves: u64,
166    /// Number of dirty (modified since last hash) leaves in this subtrie.
167    num_dirty_leaves: u64,
168    /// Cached total memory footprint of this subtrie in bytes. Computed during prune;
169    /// starts at 0 before the first prune.
170    cached_memory_size: usize,
171}
172
173impl ArenaSparseSubtrie {
174    /// Creates a new subtrie with a pre-allocated root slot containing
175    /// [`ArenaSparseNode::EmptyRoot`]. The caller must overwrite `subtrie.arena[subtrie.root]`
176    /// before use.
177    fn new(record_updates: bool) -> Box<Self> {
178        let mut arena = SlotMap::new();
179        let root = arena.insert(ArenaSparseNode::EmptyRoot);
180        let buffers = ArenaTrieBuffers {
181            updates: record_updates.then(SparseTrieUpdates::default),
182            ..Default::default()
183        };
184        Box::new(Self {
185            arena,
186            root,
187            path: Nibbles::default(),
188            buffers,
189            required_proofs: Vec::new(),
190            num_leaves: 0,
191            num_dirty_leaves: 0,
192            cached_memory_size: 0,
193        })
194    }
195
196    /// Returns the cached total memory footprint of this subtrie in bytes.
197    /// Accurate after prune; returns 0 before the first prune.
198    const fn memory_size(&self) -> usize {
199        self.cached_memory_size
200    }
201
202    /// Asserts that `num_leaves` and `num_dirty_leaves` match the actual counts in the arena.
203    #[cfg(debug_assertions)]
204    fn debug_assert_counters(&self) {
205        let (actual_leaves, actual_dirty) =
206            ArenaParallelSparseTrie::count_leaves_and_dirty(&self.arena, self.root);
207        debug_assert_eq!(
208            self.num_leaves, actual_leaves,
209            "subtrie {:?} num_leaves mismatch: stored {} vs actual {}",
210            self.path, self.num_leaves, actual_leaves,
211        );
212        debug_assert_eq!(
213            self.num_dirty_leaves, actual_dirty,
214            "subtrie {:?} num_dirty_leaves mismatch: stored {} vs actual {}",
215            self.path, self.num_dirty_leaves, actual_dirty,
216        );
217    }
218
219    /// Prunes revealed subtrees that are not ancestors of any retained leaf, compacting the
220    /// arena in a single BFS pass.
221    ///
222    /// `retained_leaves` must be sorted and scoped to this subtrie's key range. Builds a fresh
223    /// arena by BFS-copying only retained nodes from the root, blinding non-retained children
224    /// at the boundary. Non-retained subtrees are never visited — they are dropped with the
225    /// old arena.
226    ///
227    /// Expects that all nodes have computed hashes (i.e. `prune` is called after hashing).
228    fn prune(&mut self, retained_leaves: &[Nibbles]) -> usize {
229        // Only branches can have pruneable children.
230        if !matches!(&self.arena[self.root], ArenaSparseNode::Branch(_)) {
231            return 0;
232        }
233
234        debug_assert!(
235            retained_leaves.windows(2).all(|w| w[0] <= w[1]),
236            "retained_leaves must be sorted"
237        );
238        debug_assert_eq!(self.num_dirty_leaves, 0, "prune must run after hashing");
239
240        let old_count = self.arena.len();
241        // In a tree where every branch has ≥2 children, #branches ≤ #leaves − 1, so
242        // total nodes ≤ 2N − 1. This is a reasonable upper-bound capacity hint that
243        // avoids most reallocations without over-allocating when pruning is heavy.
244        let mut new_arena = SlotMap::with_capacity(retained_leaves.len() * 2);
245        // Queue: (new_idx, path TO the node — excluding its own short_key)
246        let mut queue: VecDeque<(Index, Nibbles)> = VecDeque::new();
247        let mut new_num_leaves = 0u64;
248        let mut new_nodes_heap_size = 0usize;
249
250        // Root is always retained.
251        let root_node = self.arena.remove(self.root).expect("root exists");
252        let new_root = new_arena.insert(root_node);
253        queue.push_back((new_root, self.path));
254
255        while let Some((new_idx, node_path)) = queue.pop_front() {
256            new_nodes_heap_size += new_arena[new_idx].extra_heap_bytes();
257
258            let ArenaSparseNode::Branch(b) = &new_arena[new_idx] else {
259                if matches!(&new_arena[new_idx], ArenaSparseNode::Leaf { .. }) {
260                    new_num_leaves += 1;
261                }
262                continue;
263            };
264
265            // Logical path of this branch (path TO node + its extension/short_key).
266            let mut branch_logical_path = node_path;
267            branch_logical_path.extend(&b.short_key);
268
269            // Collect (dense_pos, nibble, old_child_idx) for revealed children.
270            let children: SmallVec<[(usize, u8, Index); 16]> = BranchChildIter::new(b.state_mask)
271                .filter_map(|(dense_idx, nibble)| match &b.children[dense_idx] {
272                    ArenaSparseNodeBranchChild::Revealed(old_idx) => {
273                        Some((dense_idx.get(), nibble, *old_idx))
274                    }
275                    _ => None,
276                })
277                .collect();
278
279            for (child_pos, nibble, old_child_idx) in children {
280                // Child's path in the trie (edges to reach it, excluding its own short_key).
281                let mut child_path = branch_logical_path;
282                child_path.push(nibble);
283
284                // Child's full prefix for the retention check.
285                let child_short_key = match &self.arena[old_child_idx] {
286                    ArenaSparseNode::Branch(b) => &b.short_key,
287                    ArenaSparseNode::Leaf { key, .. } => key,
288                    other => unreachable!("subtrie prune: unexpected child node kind: {other:?}"),
289                };
290                let mut child_prefix = child_path;
291                child_prefix.extend(child_short_key);
292
293                if has_prefix(retained_leaves, &child_prefix) {
294                    // Retained — move child to new arena.
295                    let child_node = self.arena.remove(old_child_idx).expect("child exists");
296                    let new_child_idx = new_arena.insert(child_node);
297                    let ArenaSparseNode::Branch(b) = &mut new_arena[new_idx] else {
298                        unreachable!()
299                    };
300                    b.children[child_pos] = ArenaSparseNodeBranchChild::Revealed(new_child_idx);
301                    queue.push_back((new_child_idx, child_path));
302                } else {
303                    // Not retained — blind the child slot in the new arena.
304                    let rlp_node = self.arena[old_child_idx]
305                        .state_ref()
306                        .expect("child must have state")
307                        .cached_rlp_node()
308                        .cloned()
309                        .expect("pruned child must have cached RLP (prune runs after hashing)");
310                    let ArenaSparseNode::Branch(b) = &mut new_arena[new_idx] else {
311                        unreachable!()
312                    };
313                    b.children[child_pos] = ArenaSparseNodeBranchChild::Blinded(rlp_node);
314                }
315            }
316        }
317
318        let pruned = old_count - new_arena.len();
319        self.num_leaves = new_num_leaves;
320        self.num_dirty_leaves = 0;
321        self.arena = new_arena;
322        self.root = new_root;
323        self.cached_memory_size =
324            self.arena.capacity() * slotmap_slot_size::<ArenaSparseNode>() + new_nodes_heap_size;
325
326        #[cfg(debug_assertions)]
327        self.debug_assert_counters();
328        return pruned;
329
330        /// Returns `true` if any entry in `sorted_keys` starts with `prefix`.
331        fn has_prefix(sorted_keys: &[Nibbles], prefix: &Nibbles) -> bool {
332            let idx = sorted_keys.binary_search(prefix).unwrap_or_else(|i| i);
333            sorted_keys.get(idx).is_some_and(|p| p.starts_with(prefix))
334        }
335    }
336
337    /// Applies leaf updates within this subtrie. Uses the same walk-down-with-cursor pattern as
338    /// [`Self::reveal_nodes`], but checks accessibility for [`LeafUpdate::Touched`] entries.
339    ///
340    /// `sorted_updates` must be sorted lexicographically by their nibbles path (index 1).
341    ///
342    /// Any required proofs are appended to `self.required_proofs` and should be drained by the
343    /// caller after this method returns.
344    #[instrument(
345        level = "trace",
346        target = TRACE_TARGET,
347        skip_all,
348        fields(
349            subtrie = ?self.path,
350            num_updates = sorted_updates.len(),
351        ),
352    )]
353    fn update_leaves(&mut self, sorted_updates: &[(B256, Nibbles, LeafUpdate)]) {
354        if sorted_updates.is_empty() {
355            return;
356        }
357        trace!(target: TRACE_TARGET, "Subtrie update_leaves");
358
359        debug_assert!(
360            !matches!(self.arena[self.root], ArenaSparseNode::EmptyRoot),
361            "subtrie root must not be EmptyRoot at start of update_leaves"
362        );
363
364        self.buffers.cursor.reset(&self.arena, self.root, self.path);
365
366        for (idx, &(key, ref full_path, ref update)) in sorted_updates.iter().enumerate() {
367            let find_result = self.buffers.cursor.seek(&mut self.arena, full_path);
368
369            // If the path hits a blinded node, request a proof regardless of update type.
370            if matches!(find_result, SeekResult::Blinded) {
371                let logical_len = self.buffers.cursor.head_logical_branch_path_len(&self.arena);
372                self.required_proofs.push((
373                    idx,
374                    ArenaRequiredProof { key, min_len: (logical_len as u8 + 1).min(64) },
375                ));
376                continue;
377            }
378
379            match update {
380                LeafUpdate::Changed(value) if !value.is_empty() => {
381                    // Upsert: insert or update a leaf with the given value.
382                    let (_result, deltas) = ArenaParallelSparseTrie::upsert_leaf(
383                        &mut self.arena,
384                        &mut self.buffers.cursor,
385                        &mut self.root,
386                        full_path,
387                        value,
388                        find_result,
389                    );
390                    self.num_leaves = (self.num_leaves as i64 + deltas.num_leaves_delta) as u64;
391                    self.num_dirty_leaves =
392                        (self.num_dirty_leaves as i64 + deltas.num_dirty_leaves_delta) as u64;
393                }
394                LeafUpdate::Changed(_) => {
395                    let (result, deltas) = ArenaParallelSparseTrie::remove_leaf(
396                        &mut self.arena,
397                        &mut self.buffers.cursor,
398                        &mut self.root,
399                        key,
400                        full_path,
401                        find_result,
402                        &mut self.buffers.updates,
403                    );
404                    self.num_leaves = (self.num_leaves as i64 + deltas.num_leaves_delta) as u64;
405                    self.num_dirty_leaves =
406                        (self.num_dirty_leaves as i64 + deltas.num_dirty_leaves_delta) as u64;
407
408                    if let RemoveLeafResult::NeedsProof { key, proof_key, min_len } = result {
409                        self.required_proofs
410                            .push((idx, ArenaRequiredProof { key: proof_key, min_len }));
411                        self.required_proofs.push((idx, ArenaRequiredProof { key, min_len }));
412                    }
413                }
414                LeafUpdate::Touched => {}
415            }
416        }
417
418        // Drain remaining cursor entries, propagating dirty state.
419        self.buffers.cursor.drain(&mut self.arena);
420
421        #[cfg(debug_assertions)]
422        self.debug_assert_counters();
423    }
424
425    /// Reveals nodes inside this subtrie. Uses [`ArenaCursor::seek`] to locate the ancestor
426    /// node, then replaces blinded children with the proof nodes.
427    fn reveal_nodes(&mut self, nodes: &mut [ProofTrieNodeV2]) -> SparseTrieResult<()> {
428        if nodes.is_empty() {
429            return Ok(());
430        }
431        trace!(target: TRACE_TARGET, path = ?self.path, num_nodes = nodes.len(), "Subtrie reveal_nodes");
432
433        debug_assert!(
434            !matches!(self.arena[self.root], ArenaSparseNode::EmptyRoot),
435            "subtrie root must not be EmptyRoot in reveal_nodes"
436        );
437
438        self.buffers.cursor.reset(&self.arena, self.root, self.path);
439
440        for node in nodes.iter_mut() {
441            let find_result = self.buffers.cursor.seek(&mut self.arena, &node.path);
442            if ArenaParallelSparseTrie::reveal_node(
443                &mut self.arena,
444                &self.buffers.cursor,
445                node,
446                find_result,
447            )
448            .is_some_and(|child_idx| matches!(self.arena[child_idx], ArenaSparseNode::Leaf { .. }))
449            {
450                self.num_leaves += 1;
451            }
452        }
453
454        // Drain remaining cursor entries, propagating dirty state.
455        self.buffers.cursor.drain(&mut self.arena);
456
457        #[cfg(debug_assertions)]
458        self.debug_assert_counters();
459
460        Ok(())
461    }
462
463    /// Computes and caches `RlpNode` for all dirty nodes via iterative post-order DFS.
464    /// After this call every node reachable from `self.root` will be in `Cached` state.
465    ///
466    /// Trie updates are written directly to `self.buffers.updates` (if `Some`).
467    fn update_cached_rlp(&mut self) {
468        ArenaParallelSparseTrie::update_cached_rlp(
469            &mut self.arena,
470            self.root,
471            &mut self.buffers.cursor,
472            &mut self.buffers.rlp_buf,
473            &mut self.buffers.rlp_node_buf,
474            self.path,
475            &mut self.buffers.updates,
476        );
477        self.num_dirty_leaves = 0;
478        #[cfg(debug_assertions)]
479        self.debug_assert_counters();
480    }
481}
482
483/// Tracks the net change in leaf counters caused by a trie mutation (upsert or removal).
484/// Returned alongside [`UpsertLeafResult`] / [`RemoveLeafResult`] so the caller can maintain
485/// aggregate counters on [`ArenaSparseSubtrie`] without scanning the arena.
486#[derive(Debug, Default)]
487struct SubtrieCounterDeltas {
488    num_leaves_delta: i64,
489    num_dirty_leaves_delta: i64,
490}
491
492/// Result of `upsert_leaf` indicating whether a new child was created that the caller
493/// may need to wrap as a subtrie (in the upper trie).
494#[derive(Debug)]
495enum UpsertLeafResult {
496    /// A leaf was updated in place (no structural change).
497    Updated,
498    /// A new leaf was created (e.g. EmptyRoot→Leaf, or root-level split).
499    NewLeaf,
500    /// A new child (branch or leaf) was created or inserted. The child is the cursor head
501    /// and its parent is the cursor's parent.
502    NewChild,
503}
504
505/// Result of `remove_leaf` indicating whether a proof is needed to complete a branch
506/// collapse.
507#[derive(Debug)]
508enum RemoveLeafResult {
509    /// No proof needed — the removal (and any collapse) completed fully.
510    Removed,
511    /// No leaf was found at the given path (no-op).
512    NotFound,
513    /// The branch collapse requires revealing a blinded sibling. The caller must request a
514    /// proof for the given key at the given minimum depth.
515    NeedsProof { key: B256, proof_key: B256, min_len: u8 },
516}
517
518/// A proof request generated during leaf updates when a blinded node is encountered.
519#[derive(Debug, Clone)]
520struct ArenaRequiredProof {
521    /// The key requiring a proof.
522    key: B256,
523    /// Minimum depth at which proof nodes should be returned.
524    min_len: u8,
525}
526
527/// An arena-based parallel sparse trie.
528///
529/// Configuration for controlling when parallelism is enabled in [`ArenaParallelSparseTrie`]
530/// operations.
531#[derive(Debug, Clone, Copy, PartialEq, Eq)]
532pub struct ArenaParallelismThresholds {
533    /// Minimum number of dirty leaves in a subtrie before it is eligible for parallel hash
534    /// computation. Subtries with fewer dirty leaves than this are hashed serially during
535    /// [`ArenaParallelSparseTrie::update_subtrie_hashes`].
536    pub min_dirty_leaves: u64,
537    /// Minimum number of nodes to reveal in a subtrie before it is eligible for parallel
538    /// reveal. Subtries with fewer nodes to reveal than this are revealed inline during the
539    /// upper trie walk.
540    pub min_revealed_nodes: usize,
541    /// Minimum number of leaf updates targeting a subtrie before it is eligible for parallel
542    /// update. Subtries with fewer updates than this are updated inline during the upper trie
543    /// walk.
544    pub min_updates: usize,
545    /// Minimum number of revealed leaves in a subtrie before it is eligible for parallel
546    /// pruning. Subtries with fewer leaves than this are pruned inline during the upper trie
547    /// walk.
548    pub min_leaves_for_prune: u64,
549}
550
551impl Default for ArenaParallelismThresholds {
552    fn default() -> Self {
553        Self {
554            min_dirty_leaves: 64,
555            min_revealed_nodes: 16,
556            min_updates: 128,
557            min_leaves_for_prune: 128,
558        }
559    }
560}
561
562/// An arena-based sparse trie whose subtries can be mutated in parallel.
563///
564/// ## Structure
565///
566/// Uses arena allocation ([`slotmap::SlotMap`]) for node storage with direct index-based child
567/// pointers, avoiding the per-node hashing overhead of a `HashMap`-based trie. The trie is split
568/// into two tiers:
569///
570/// - **Upper trie** (`upper_arena`): Contains nodes whose path is shorter than
571///   `UPPER_TRIE_MAX_DEPTH` nibbles. These are the root and its immediate children.
572/// - **Lower subtries** (`ArenaSparseSubtrie`): Each child of an upper-trie branch at the depth
573///   boundary becomes the root of its own subtrie, stored as an `ArenaSparseNode::Subtrie` child in
574///   the upper arena. Each subtrie owns its own arena, enabling lock-free parallel mutation.
575///
576/// Node placement is determined by path length (not counting a branch's short key):
577///
578/// - Paths with **< `UPPER_TRIE_MAX_DEPTH`** nibbles live in `upper_arena`.
579/// - Paths with **≥ `UPPER_TRIE_MAX_DEPTH`** nibbles live in a subtrie.
580///
581/// ## Node Revealing
582///
583/// Nodes are lazily revealed from proof data via [`SparseTrie::reveal_nodes`]. Each node is
584/// placed into the upper arena or delegated to its subtrie based on path depth. Unrevealed
585/// children are stored as `ArenaSparseNodeBranchChild::Blinded` with their RLP encoding.
586/// When multiple subtries have pending reveals, they are processed in parallel using rayon
587/// (controlled by [`ArenaParallelismThresholds::min_revealed_nodes`]).
588///
589/// ## Leaf Operations
590///
591/// Leaf updates and removals are applied via [`SparseTrie::update_leaves`]. The method walks
592/// the upper trie to route each update to the correct subtrie, then processes subtries in
593/// parallel when the update count exceeds [`ArenaParallelismThresholds::min_updates`].
594///
595/// [`SparseTrie::update_leaf`] and [`SparseTrie::remove_leaf`] are not yet implemented.
596///
597/// After updates, structural changes (branch collapse, subtrie unwrapping) are handled by
598/// propagating dirty state back up through the upper trie.
599///
600/// ## Root Hash Calculation
601///
602/// Root hash computation follows a bottom-up approach:
603///
604/// 1. **[`SparseTrie::update_subtrie_hashes`]**: Takes dirty subtries from the upper arena and
605///    hashes them in parallel (when dirty leaf count meets
606///    [`ArenaParallelismThresholds::min_dirty_leaves`]), then walks the upper trie to restore
607///    hashed subtries and inline-hash any remaining dirty nodes.
608/// 2. **[`SparseTrie::root`]**: Calls `update_subtrie_hashes`, then RLP-encodes the full upper trie
609///    depth-first to produce the root hash.
610///
611/// Each node tracks its state via `ArenaSparseNodeState` (`Revealed`, `Cached`, or `Dirty`)
612/// so only modified subtrees are recomputed.
613///
614/// ## Pruning
615///
616/// [`SparseTrie::prune`] removes revealed nodes that are not ancestors of any retained leaf.
617/// Pruned nodes are replaced with `ArenaSparseNodeBranchChild::Blinded` entries using their
618/// cached RLP. Subtries are pruned in parallel when their leaf count exceeds
619/// [`ArenaParallelismThresholds::min_leaves_for_prune`].
620#[derive(Debug, Clone)]
621pub struct ArenaParallelSparseTrie {
622    /// The arena allocating nodes in the upper trie.
623    upper_arena: NodeArena,
624    /// The root node of the upper trie.
625    root: Index,
626    /// Optional tracking of trie updates for database persistence.
627    updates: Option<SparseTrieUpdates>,
628    /// Reusable buffers for traversal, RLP encoding, and update actions.
629    buffers: ArenaTrieBuffers,
630    /// Thresholds controlling when parallelism is enabled for different operations.
631    parallelism_thresholds: ArenaParallelismThresholds,
632    /// Debug recorder for tracking mutating operations.
633    #[cfg(feature = "trie-debug")]
634    debug_recorder: TrieDebugRecorder,
635}
636
637impl ArenaParallelSparseTrie {
638    /// Sets the thresholds that control when parallelism is used during operations.
639    pub const fn with_parallelism_thresholds(
640        mut self,
641        thresholds: ArenaParallelismThresholds,
642    ) -> Self {
643        self.parallelism_thresholds = thresholds;
644        self
645    }
646
647    /// Resets the debug recorder and records the current trie state as `SetRoot` + `RevealNodes`
648    /// ops, representing the initial state at the beginning of a block (after pruning).
649    ///
650    /// Walks the upper arena and all subtries depth-first using the cursor, converting each
651    /// node into a [`crate::debug_recorder::ProofTrieNodeRecord`].
652    #[cfg(feature = "trie-debug")]
653    fn record_initial_state(&mut self) {
654        use crate::debug_recorder::{NodeStateRecord, TrieNodeRecord};
655        use alloy_primitives::hex;
656        use alloy_trie::nodes::{BranchNode, TrieNode};
657
658        fn state_to_record(state: &ArenaSparseNodeState) -> NodeStateRecord {
659            match state {
660                ArenaSparseNodeState::Revealed => NodeStateRecord::Revealed,
661                ArenaSparseNodeState::Cached { rlp_node } => {
662                    NodeStateRecord::Cached { rlp_node: hex::encode(rlp_node.as_ref()) }
663                }
664                ArenaSparseNodeState::Dirty => NodeStateRecord::Dirty,
665            }
666        }
667
668        /// Converts an [`ArenaSparseNode`] into a [`ProofTrieNodeRecord`] at the given path.
669        /// For branch children, resolves revealed children's cached RLP from `arena`.
670        /// Returns `None` for subtrie/taken-subtrie nodes (handled separately).
671        fn node_to_record(
672            arena: &NodeArena,
673            idx: Index,
674            path: Nibbles,
675        ) -> Option<ProofTrieNodeRecord> {
676            match &arena[idx] {
677                ArenaSparseNode::EmptyRoot => Some(ProofTrieNodeRecord {
678                    path,
679                    node: TrieNodeRecord(TrieNode::EmptyRoot),
680                    masks: None,
681                    short_key: None,
682                    state: None,
683                }),
684                ArenaSparseNode::Branch(b) => {
685                    let stack = b
686                        .children
687                        .iter()
688                        .map(|child| match child {
689                            ArenaSparseNodeBranchChild::Blinded(rlp) => rlp.clone(),
690                            ArenaSparseNodeBranchChild::Revealed(child_idx) => {
691                                // After pruning / root(), all nodes have cached RLP.
692                                arena[*child_idx]
693                                    .state_ref()
694                                    .and_then(|s| s.cached_rlp_node())
695                                    .cloned()
696                                    .unwrap_or_default()
697                            }
698                        })
699                        .collect();
700                    Some(ProofTrieNodeRecord {
701                        path,
702                        node: TrieNodeRecord(TrieNode::Branch(BranchNode::new(
703                            stack,
704                            b.state_mask,
705                        ))),
706                        masks: Some((
707                            b.branch_masks.hash_mask.get(),
708                            b.branch_masks.tree_mask.get(),
709                        )),
710                        short_key: (!b.short_key.is_empty()).then_some(b.short_key),
711                        state: Some(state_to_record(&b.state)),
712                    })
713                }
714                ArenaSparseNode::Leaf { key, value, state, .. } => Some(ProofTrieNodeRecord {
715                    path,
716                    node: TrieNodeRecord(TrieNode::Leaf(alloy_trie::nodes::LeafNode::new(
717                        *key,
718                        value.clone(),
719                    ))),
720                    masks: None,
721                    short_key: None,
722                    state: Some(state_to_record(state)),
723                }),
724                ArenaSparseNode::Subtrie(_) | ArenaSparseNode::TakenSubtrie => None,
725            }
726        }
727
728        /// Walks an arena depth-first using `cursor` and collects all nodes as records.
729        fn collect_records(
730            arena: &mut NodeArena,
731            root: Index,
732            root_path: Nibbles,
733            cursor: &mut ArenaCursor,
734            result: &mut Vec<ProofTrieNodeRecord>,
735        ) {
736            cursor.reset(arena, root, root_path);
737
738            // The cursor starts with root on the stack but `next` only yields children.
739            if let Some(record) = node_to_record(arena, root, root_path) {
740                result.push(record);
741            }
742
743            loop {
744                match cursor.next(arena, |_, node| {
745                    matches!(node, ArenaSparseNode::Branch(_) | ArenaSparseNode::Leaf { .. })
746                }) {
747                    NextResult::Done => break,
748                    NextResult::Branch | NextResult::NonBranch => {
749                        let head = cursor.head().expect("cursor is non-empty");
750                        if let Some(record) = node_to_record(arena, head.index, head.path) {
751                            result.push(record);
752                        }
753                    }
754                }
755            }
756        }
757
758        let mut nodes = Vec::new();
759
760        // Collect from the upper arena.
761        collect_records(
762            &mut self.upper_arena,
763            self.root,
764            Nibbles::default(),
765            &mut self.buffers.cursor,
766            &mut nodes,
767        );
768
769        // Collect from all subtries.
770        for (_, node) in &mut self.upper_arena {
771            if let ArenaSparseNode::Subtrie(subtrie) = node {
772                collect_records(
773                    &mut subtrie.arena,
774                    subtrie.root,
775                    subtrie.path,
776                    &mut self.buffers.cursor,
777                    &mut nodes,
778                );
779            }
780        }
781
782        // Reset the recorder and record that we pruned, then the initial state.
783        self.debug_recorder.reset();
784        self.debug_recorder.record(RecordedOp::Prune);
785
786        // First node is the root → SetRoot, remaining → RevealNodes.
787        if let Some(root_record) = nodes.first() {
788            self.debug_recorder.record(RecordedOp::SetRoot { node: root_record.clone() });
789        }
790        if nodes.len() > 1 {
791            self.debug_recorder.record(RecordedOp::RevealNodes { nodes: nodes[1..].to_vec() });
792        }
793    }
794
795    /// Returns `true` if a node at the given path length should be placed in a subtrie rather
796    /// than the upper arena.
797    const fn should_be_subtrie(path_len: usize) -> bool {
798        path_len == UPPER_TRIE_MAX_DEPTH
799    }
800
801    /// If the child at the cursor head should be a subtrie based on its depth, wraps it
802    /// in [`ArenaSparseNode::Subtrie`].
803    ///
804    /// The child must be the cursor head and its parent the cursor's parent.
805    fn maybe_wrap_in_subtrie(&mut self, child_idx: Index, child_path: &Nibbles) {
806        if !Self::should_be_subtrie(child_path.len()) {
807            return;
808        }
809
810        // Only branch and leaf nodes can become subtrie roots.
811        if !matches!(
812            self.upper_arena[child_idx],
813            ArenaSparseNode::Branch(_) | ArenaSparseNode::Leaf { .. }
814        ) {
815            return;
816        }
817
818        trace!(target: TRACE_TARGET, ?child_path, "Wrapping child into subtrie");
819        let mut subtrie = ArenaSparseSubtrie::new(self.updates.is_some());
820        subtrie.path = *child_path;
821        let mut root_node =
822            mem::replace(&mut self.upper_arena[child_idx], ArenaSparseNode::TakenSubtrie);
823
824        // Migrate any revealed children from the upper arena into the subtrie arena.
825        if let ArenaSparseNode::Branch(b) = &mut root_node {
826            for child in &mut b.children {
827                if let ArenaSparseNodeBranchChild::Revealed(idx) = child {
828                    *idx =
829                        Self::migrate_nodes(&mut subtrie.arena, &mut self.upper_arena, *idx, None);
830                }
831            }
832        }
833
834        subtrie.arena[subtrie.root] = root_node;
835        let (leaves, dirty) = Self::count_leaves_and_dirty(&subtrie.arena, subtrie.root);
836        subtrie.num_leaves = leaves;
837        subtrie.num_dirty_leaves = dirty;
838        #[cfg(debug_assertions)]
839        subtrie.debug_assert_counters();
840        self.upper_arena[child_idx] = ArenaSparseNode::Subtrie(subtrie);
841    }
842
843    /// If the cursor head is a branch, wraps any revealed children that sit at
844    /// the subtrie boundary depth (`UPPER_TRIE_MAX_DEPTH`). This is needed after
845    /// structural changes like root-level splits or subtrie unwraps that can place
846    /// non-subtrie nodes at the boundary depth.
847    fn maybe_wrap_branch_children(&mut self, cursor: &ArenaCursor) {
848        let head = cursor.head().expect("cursor is non-empty");
849        let head_idx = head.index;
850        let head_path = head.path;
851
852        let ArenaSparseNode::Branch(b) = &self.upper_arena[head_idx] else { return };
853        let short_key = b.short_key;
854        let children: SmallVec<[_; 4]> = b
855            .child_iter()
856            .filter_map(|(nibble, child)| match child {
857                ArenaSparseNodeBranchChild::Revealed(idx) => Some((nibble, *idx)),
858                ArenaSparseNodeBranchChild::Blinded(_) => None,
859            })
860            .collect();
861
862        for (nibble, child_idx) in children {
863            let mut child_path = head_path;
864            child_path.extend(&short_key);
865            child_path.push_unchecked(nibble);
866            self.maybe_wrap_in_subtrie(child_idx, &child_path);
867        }
868    }
869
870    /// Checks whether the subtrie at the cursor head has become empty after updates.
871    /// If the subtrie's root is [`ArenaSparseNode::EmptyRoot`] (all leaves were removed), the
872    /// child slot is removed from the parent branch entirely, the subtrie is recycled, and
873    /// if the parent is left with a single revealed child, it is collapsed via
874    /// `collapse_branch`.
875    ///
876    /// The subtrie must be the cursor head and its parent the cursor's parent.
877    /// Pops the subtrie entry (propagating leaf count deltas) before returning.
878    #[instrument(
879        level = "trace",
880        target = TRACE_TARGET,
881        skip_all,
882        fields(subtrie_path = ?cursor.head().expect("cursor is non-empty").path),
883    )]
884    fn maybe_unwrap_subtrie(&mut self, cursor: &mut ArenaCursor) {
885        let subtrie_idx = cursor.head().expect("cursor is non-empty").index;
886
887        let ArenaSparseNode::Subtrie(subtrie) = &self.upper_arena[subtrie_idx] else {
888            return;
889        };
890
891        if !matches!(subtrie.arena[subtrie.root], ArenaSparseNode::EmptyRoot) {
892            return;
893        }
894
895        let child_nibble = cursor
896            .head()
897            .expect("cursor is non-empty")
898            .path
899            .last()
900            .expect("subtrie path must have at least one nibble");
901        let parent_idx = cursor.parent().expect("cursor has parent").index;
902
903        // Pop the subtrie entry before mutating, so collapse_branch sees the parent as
904        // the cursor head.
905        cursor.pop(&mut self.upper_arena);
906
907        self.recycle_subtrie_from_idx(subtrie_idx);
908
909        trace!(target: TRACE_TARGET, "Unwrapping empty subtrie, removing child slot");
910        let parent_branch = self.upper_arena[parent_idx].branch_mut();
911        let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
912            .expect("child nibble not found in parent state_mask");
913
914        parent_branch.children.remove(child_idx.get());
915        parent_branch.unset_child_bit(child_nibble);
916        // The branch structure changed (child removed), so any cached RLP is stale.
917        parent_branch.state = parent_branch.state.to_dirty();
918
919        self.maybe_collapse_or_remove_branch(cursor);
920    }
921
922    /// Merges buffered updates from a [`ArenaSparseNode::Subtrie`] and drops it.
923    ///
924    /// # Panics
925    ///
926    /// Panics if `node` is not a `Subtrie`.
927    fn recycle_subtrie(&mut self, node: ArenaSparseNode) {
928        let ArenaSparseNode::Subtrie(mut subtrie) = node else {
929            unreachable!("recycle_subtrie called on non-Subtrie node")
930        };
931        Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
932    }
933
934    /// Removes a [`ArenaSparseNode::Subtrie`] from the upper arena at `idx` and recycles it.
935    fn recycle_subtrie_from_idx(&mut self, idx: Index) {
936        let node = self.upper_arena.remove(idx).expect("subtrie exists in arena");
937        self.recycle_subtrie(node);
938    }
939
940    /// Handles cascading structural changes on the branch at the cursor head after a child
941    /// has been removed.
942    ///
943    /// Depending on the remaining child count:
944    /// - **0 children**: the branch becomes `EmptyRoot` (if root) or is removed from its parent,
945    ///   cascading upward.
946    /// - **1 child**: collapses the branch into its sole child, unless that child is a
947    ///   `TakenSubtrie` (deferred) or blinded. If the remaining child is an empty subtrie, it is
948    ///   also removed, reducing to the 0-children case.
949    /// - **2+ children**: nothing to do.
950    fn maybe_collapse_or_remove_branch(&mut self, cursor: &mut ArenaCursor) {
951        loop {
952            let branch_idx = cursor.head().expect("cursor is non-empty").index;
953
954            // Read-only phase: extract the count and remaining-child info we need before
955            // mutating. All values here are Copy so the borrow is released.
956            let count = {
957                let ArenaSparseNode::Branch(b) = &self.upper_arena[branch_idx] else {
958                    return;
959                };
960                b.state_mask.count_bits()
961            };
962
963            if count >= 2 {
964                return;
965            }
966
967            if count == 0 {
968                if branch_idx == self.root {
969                    self.upper_arena[branch_idx] = ArenaSparseNode::EmptyRoot;
970                    return;
971                }
972                // Remove the empty branch from its parent.
973                let branch_nibble = cursor
974                    .head()
975                    .expect("cursor is non-empty")
976                    .path
977                    .last()
978                    .expect("non-root branch");
979                cursor.pop(&mut self.upper_arena);
980                self.upper_arena.remove(branch_idx);
981                let parent_idx = cursor.head().expect("cursor is non-empty").index;
982                let parent_branch = self.upper_arena[parent_idx].branch_mut();
983                let child_idx = BranchChildIdx::new(parent_branch.state_mask, branch_nibble)
984                    .expect("child nibble not found in parent state_mask");
985                parent_branch.children.remove(child_idx.get());
986                parent_branch.unset_child_bit(branch_nibble);
987                parent_branch.state = parent_branch.state.to_dirty();
988                continue; // re-check the parent
989            }
990
991            // count == 1 — determine what kind of child remains.
992            let (remaining_nibble, remaining_child_idx) = {
993                let b = self.upper_arena[branch_idx].branch_ref();
994                let nibble = b.state_mask.iter().next().expect("branch has at least one child");
995                let child_idx = match &b.children[0] {
996                    ArenaSparseNodeBranchChild::Revealed(idx) => Some(*idx),
997                    ArenaSparseNodeBranchChild::Blinded(_) => None,
998                };
999                (nibble, child_idx)
1000            };
1001
1002            let Some(child_idx) = remaining_child_idx else {
1003                debug_assert!(false, "single remaining child is blinded — should have been caught by check_subtrie_collapse_needs_proof");
1004                return;
1005            };
1006
1007            if matches!(self.upper_arena[child_idx], ArenaSparseNode::TakenSubtrie) {
1008                // Subtrie hasn't been restored yet; collapse is deferred to the
1009                // post-restore phase.
1010                return;
1011            }
1012
1013            // Check if the remaining child is an empty subtrie that should also be removed.
1014            let is_empty_subtrie = matches!(
1015                &self.upper_arena[child_idx],
1016                ArenaSparseNode::Subtrie(s) if matches!(s.arena[s.root], ArenaSparseNode::EmptyRoot)
1017            );
1018
1019            if is_empty_subtrie {
1020                self.recycle_subtrie_from_idx(child_idx);
1021                let branch = self.upper_arena[branch_idx].branch_mut();
1022                branch.children.remove(0);
1023                branch.unset_child_bit(remaining_nibble);
1024                branch.state = branch.state.to_dirty();
1025                continue; // now count == 0, will be handled next iteration
1026            }
1027
1028            // Normal collapse: the remaining child is a Leaf, Branch, or non-empty Subtrie.
1029            Self::collapse_branch(
1030                &mut self.upper_arena,
1031                cursor,
1032                &mut self.root,
1033                &mut self.buffers.updates,
1034            );
1035
1036            // After collapse, the remaining child (now at cursor head) may be a
1037            // Subtrie whose path was shortened by the collapsed branch's prefix. Since
1038            // should_be_subtrie requires path_len == UPPER_TRIE_MAX_DEPTH and the collapse
1039            // made the path shorter, the subtrie is no longer eligible — unwrap it.
1040            let child_idx = cursor.head().expect("cursor is non-empty").index;
1041            if let ArenaSparseNode::Subtrie(_) = &self.upper_arena[child_idx] {
1042                let ArenaSparseNode::Subtrie(mut subtrie) =
1043                    mem::replace(&mut self.upper_arena[child_idx], ArenaSparseNode::TakenSubtrie)
1044                else {
1045                    unreachable!()
1046                };
1047                Self::migrate_nodes(
1048                    &mut self.upper_arena,
1049                    &mut subtrie.arena,
1050                    subtrie.root,
1051                    Some(child_idx),
1052                );
1053                Self::merge_subtrie_updates(
1054                    &mut self.buffers.updates,
1055                    &mut subtrie.buffers.updates,
1056                );
1057
1058                // The migrated subtrie root may be a branch whose children now live in
1059                // the upper arena at or beyond the subtrie boundary depth. Re-wrap any
1060                // such children as subtries.
1061                self.maybe_wrap_branch_children(cursor);
1062            }
1063            return;
1064        }
1065    }
1066
1067    /// Merges updates from a subtrie's buffer into the parent's buffer.
1068    /// Both `dst` and `src` must be `Some` when updates are being tracked.
1069    ///
1070    /// Source removals cancel destination insertions (and vice versa) so that
1071    /// updates accumulated across multiple `root()` calls within a single block
1072    /// stay consistent.
1073    fn merge_subtrie_updates(
1074        dst: &mut Option<SparseTrieUpdates>,
1075        src: &mut Option<SparseTrieUpdates>,
1076    ) {
1077        if let Some(dst_updates) = dst.as_mut() {
1078            let src_updates = src.as_mut().expect("updates are enabled");
1079            debug_assert!(!src_updates.wiped, "subtrie updates should never have wiped=true");
1080
1081            // Source insertions cancel destination removals.
1082            for path in src_updates.updated_nodes.keys() {
1083                dst_updates.removed_nodes.remove(path);
1084            }
1085            dst_updates.updated_nodes.extend(src_updates.updated_nodes.drain());
1086
1087            // Source removals cancel destination insertions.
1088            for path in &src_updates.removed_nodes {
1089                dst_updates.updated_nodes.remove(path);
1090            }
1091            dst_updates.removed_nodes.extend(src_updates.removed_nodes.drain());
1092        }
1093    }
1094
1095    /// Right-pads a nibble path with zeros and packs it into a [`B256`].
1096    fn nibbles_to_padded_b256(path: &Nibbles) -> B256 {
1097        let mut bytes = [0u8; 32];
1098        path.pack_to(&mut bytes);
1099        B256::from(bytes)
1100    }
1101
1102    /// Returns the [`BranchNodeMasks`] for a branch based on the status of its children.
1103    fn get_branch_masks(arena: &NodeArena, branch: &ArenaSparseNodeBranch) -> BranchNodeMasks {
1104        let mut masks = BranchNodeMasks::default();
1105
1106        for (nibble, child) in branch.child_iter() {
1107            let (hash_bit, tree_bit) = match child {
1108                ArenaSparseNodeBranchChild::Blinded(_) => (
1109                    branch.branch_masks.hash_mask.is_bit_set(nibble),
1110                    branch.branch_masks.tree_mask.is_bit_set(nibble),
1111                ),
1112                ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1113                    let child = &arena[*child_idx];
1114                    (child.hash_mask_bit(), child.tree_mask_bit())
1115                }
1116            };
1117
1118            masks.set_child_bits(nibble, hash_bit, tree_bit);
1119        }
1120
1121        masks
1122    }
1123
1124    /// Computes and caches `RlpNode` for all dirty nodes reachable from `root` in `arena`.
1125    ///
1126    /// Uses the cursor's stack to walk dirty branches depth-first. For each branch,
1127    /// children are iterated left-to-right:
1128    /// - Blinded, cached, leaf, and `EmptyRoot` children have their `RlpNode` pushed directly onto
1129    ///   `rlp_node_buf`.
1130    /// - Dirty branch children are pushed onto `stack` and processed recursively first.
1131    ///
1132    /// When a dirty branch child finishes and is popped, the parent resumes iteration after
1133    /// the child's nibble. Once all children of a branch are processed, the branch is encoded
1134    /// via `BranchNodeRef` using the last N entries on `rlp_node_buf`, then replaced with a
1135    /// single result `RlpNode`.
1136    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, fields(base_path = ?base_path), ret)]
1137    fn update_cached_rlp(
1138        arena: &mut NodeArena,
1139        root: Index,
1140        cursor: &mut ArenaCursor,
1141        rlp_buf: &mut Vec<u8>,
1142        rlp_node_buf: &mut Vec<RlpNode>,
1143        base_path: Nibbles,
1144        updates: &mut Option<SparseTrieUpdates>,
1145    ) -> RlpNode {
1146        rlp_node_buf.clear();
1147
1148        // Step 1: Handle trivial roots that don't need the stack-based walk.
1149        // EmptyRoot has no state to update. Leaves are encoded in place. Already-cached
1150        // branches need no work. Only dirty branches enter the main loop below.
1151        match &arena[root] {
1152            ArenaSparseNode::EmptyRoot => return RlpNode::word_rlp(&EMPTY_ROOT_HASH),
1153            ArenaSparseNode::Leaf { .. } => {
1154                Self::encode_leaf(arena, root, rlp_buf, rlp_node_buf);
1155                return rlp_node_buf.pop().expect("encode_leaf must push an RlpNode");
1156            }
1157            ArenaSparseNode::Branch(b) => {
1158                if let ArenaSparseNodeState::Cached { rlp_node, .. } = &b.state {
1159                    return rlp_node.clone();
1160                }
1161            }
1162            ArenaSparseNode::Subtrie(_) | ArenaSparseNode::TakenSubtrie => {
1163                unreachable!("Subtrie/TakenSubtrie should not appear inside a subtrie's own arena");
1164            }
1165        }
1166
1167        cursor.reset(arena, root, base_path);
1168
1169        // Step 2: Walk dirty branches depth-first using `cursor.next`. Only dirty branches
1170        // are descended into; all other children (leaves, cached branches, blinded, subtries)
1171        // are encoded when their parent branch is popped.
1172        loop {
1173            let result = cursor.next(&mut *arena, |_, node| {
1174                matches!(
1175                    node,
1176                    ArenaSparseNode::Branch(b) if matches!(b.state, ArenaSparseNodeState::Dirty)
1177                )
1178            });
1179
1180            match result {
1181                NextResult::Done => break,
1182                NextResult::NonBranch => {
1183                    unreachable!("should_descend only returns true for dirty branches")
1184                }
1185                NextResult::Branch => {}
1186            };
1187
1188            let head = cursor.head().expect("cursor is non-empty");
1189            let head_idx = head.index;
1190            let head_path = head.path;
1191
1192            // The branch at `head_idx` is exhausted. All its dirty child branches
1193            // have already been encoded and cached. Collect all children's RLP nodes
1194            // and encode the branch.
1195            trace!(
1196                target: TRACE_TARGET,
1197                branch_path = ?head_path,
1198                branch_short_key = ?arena[head_idx].short_key().expect("head is a branch"),
1199                state_mask = ?arena[head_idx].branch_ref().state_mask,
1200                "Calculating branch RlpNode",
1201            );
1202
1203            rlp_node_buf.clear();
1204            let state_mask = arena[head_idx].branch_ref().state_mask;
1205            for (child_idx, _nibble) in BranchChildIter::new(state_mask) {
1206                match &arena[head_idx].branch_ref().children[child_idx] {
1207                    ArenaSparseNodeBranchChild::Blinded(rlp_node) => {
1208                        rlp_node_buf.push(rlp_node.clone());
1209                    }
1210                    ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1211                        let child_idx = *child_idx;
1212                        match &arena[child_idx] {
1213                            ArenaSparseNode::Leaf { .. } => {
1214                                Self::encode_leaf(arena, child_idx, rlp_buf, rlp_node_buf);
1215                            }
1216                            ArenaSparseNode::Branch(child_b) => {
1217                                let ArenaSparseNodeState::Cached { rlp_node, .. } = &child_b.state
1218                                else {
1219                                    panic!("child branch must be cached after DFS");
1220                                };
1221                                rlp_node_buf.push(rlp_node.clone());
1222                            }
1223                            ArenaSparseNode::Subtrie(subtrie) => {
1224                                let subtrie_root = &subtrie.arena[subtrie.root];
1225                                match subtrie_root {
1226                                    ArenaSparseNode::Branch(ArenaSparseNodeBranch {
1227                                        state: ArenaSparseNodeState::Cached { rlp_node, .. },
1228                                        ..
1229                                    }) |
1230                                    ArenaSparseNode::Leaf {
1231                                        state: ArenaSparseNodeState::Cached { rlp_node, .. },
1232                                        ..
1233                                    } => {
1234                                        rlp_node_buf.push(rlp_node.clone());
1235                                    }
1236                                    _ => panic!("subtrie root must be a cached Branch or Leaf"),
1237                                }
1238                            }
1239                            ArenaSparseNode::TakenSubtrie | ArenaSparseNode::EmptyRoot => {
1240                                unreachable!("Unexpected child {:?}", arena[child_idx]);
1241                            }
1242                        }
1243                    }
1244                }
1245            }
1246
1247            // Encode the branch, optionally wrapping in an extension if it has a short_key.
1248            let b = arena[head_idx].branch_ref();
1249            let short_key = b.short_key;
1250            let state_mask = b.state_mask;
1251            let prev_branch_masks = b.branch_masks;
1252            let new_branch_masks = Self::get_branch_masks(arena, b);
1253            let was_dirty = matches!(b.state, ArenaSparseNodeState::Dirty);
1254
1255            rlp_buf.clear();
1256            let rlp_node = BranchNodeRef::new(rlp_node_buf, state_mask).rlp(rlp_buf);
1257
1258            let rlp_node = if short_key.is_empty() {
1259                rlp_node
1260            } else {
1261                rlp_buf.clear();
1262                ExtensionNodeRef::new(&short_key, &rlp_node).rlp(rlp_buf)
1263            };
1264
1265            trace!(
1266                target: TRACE_TARGET,
1267                path = ?head_path,
1268                short_key = ?arena[head_idx].short_key(),
1269                children = ?state_mask.iter().zip(rlp_node_buf.iter()).collect::<Vec<_>>(),
1270                rlp_node = ?rlp_node,
1271                "Calculated branch RlpNode",
1272            );
1273
1274            let branch = arena[head_idx].branch_mut();
1275            branch.state = ArenaSparseNodeState::Cached { rlp_node: rlp_node.clone() };
1276            branch.branch_masks = new_branch_masks;
1277
1278            // Record trie updates for dirty branches only.
1279            // Skip the root node (empty logical path) as PST does.
1280            if let Some(trie_updates) = updates.as_mut().filter(|_| was_dirty) {
1281                let mut logical_path = head_path;
1282                logical_path.extend(&short_key);
1283
1284                if !logical_path.is_empty() {
1285                    if !prev_branch_masks.is_empty() && new_branch_masks.is_empty() {
1286                        trie_updates.updated_nodes.remove(&logical_path);
1287                        trie_updates.removed_nodes.insert(logical_path);
1288                    } else if !new_branch_masks.is_empty() {
1289                        let compact = arena[head_idx].branch_ref().branch_node_compact(arena);
1290                        trie_updates.updated_nodes.insert(logical_path, compact);
1291                        trie_updates.removed_nodes.remove(&logical_path);
1292                    }
1293                }
1294            }
1295        }
1296
1297        let ArenaSparseNodeState::Cached { rlp_node, .. } = &arena[root].branch_ref().state else {
1298            panic!("root must be cached after update_cached_rlp");
1299        };
1300        rlp_node.clone()
1301    }
1302
1303    /// Immutable traversal to find a leaf value at `full_path` starting from `root` in `arena`.
1304    /// `path_offset` is the number of nibbles already consumed from `full_path`.
1305    fn get_leaf_value_in_arena<'a>(
1306        arena: &'a NodeArena,
1307        mut current: Index,
1308        full_path: &Nibbles,
1309        mut path_offset: usize,
1310    ) -> Option<&'a Vec<u8>> {
1311        loop {
1312            match &arena[current] {
1313                ArenaSparseNode::EmptyRoot | ArenaSparseNode::TakenSubtrie => return None,
1314                ArenaSparseNode::Leaf { key, value, .. } => {
1315                    let remaining = full_path.slice(path_offset..);
1316                    return (remaining == *key).then_some(value);
1317                }
1318                ArenaSparseNode::Branch(b) => {
1319                    let short_key = &b.short_key;
1320                    let logical_end = path_offset + short_key.len();
1321                    if full_path.len() <= logical_end ||
1322                        full_path.slice(path_offset..logical_end) != *short_key
1323                    {
1324                        return None;
1325                    }
1326
1327                    let child_nibble = full_path.get_unchecked(logical_end);
1328                    let child_idx = BranchChildIdx::new(b.state_mask, child_nibble)?;
1329                    match &b.children[child_idx] {
1330                        ArenaSparseNodeBranchChild::Blinded(_) => return None,
1331                        ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1332                            current = *child_idx;
1333                            path_offset = logical_end + 1;
1334                        }
1335                    }
1336                }
1337                ArenaSparseNode::Subtrie(subtrie) => {
1338                    return Self::get_leaf_value_in_arena(
1339                        &subtrie.arena,
1340                        subtrie.root,
1341                        full_path,
1342                        path_offset,
1343                    );
1344                }
1345            }
1346        }
1347    }
1348
1349    /// Immutable traversal from the given root in `arena`, following `full_path` to find a leaf.
1350    /// Returns whether the leaf exists or not, or an error if a blinded node is encountered or
1351    /// the value doesn't match.
1352    fn find_leaf_in_arena(
1353        arena: &NodeArena,
1354        mut current: Index,
1355        full_path: &Nibbles,
1356        mut path_offset: usize,
1357        expected_value: Option<&Vec<u8>>,
1358    ) -> Result<LeafLookup, LeafLookupError> {
1359        loop {
1360            match &arena[current] {
1361                ArenaSparseNode::EmptyRoot | ArenaSparseNode::TakenSubtrie => {
1362                    return Ok(LeafLookup::NonExistent);
1363                }
1364                ArenaSparseNode::Leaf { key, value, .. } => {
1365                    let remaining = full_path.slice(path_offset..);
1366                    if remaining != *key {
1367                        return Ok(LeafLookup::NonExistent);
1368                    }
1369                    if let Some(expected) = expected_value &&
1370                        *expected != *value
1371                    {
1372                        return Err(LeafLookupError::ValueMismatch {
1373                            path: *full_path,
1374                            expected: Some(expected.clone()),
1375                            actual: value.clone(),
1376                        });
1377                    }
1378                    return Ok(LeafLookup::Exists);
1379                }
1380                ArenaSparseNode::Branch(b) => {
1381                    let short_key = &b.short_key;
1382                    let logical_end = path_offset + short_key.len();
1383
1384                    if full_path.len() <= logical_end {
1385                        return Ok(LeafLookup::NonExistent);
1386                    }
1387
1388                    if full_path.slice(path_offset..logical_end) != *short_key {
1389                        return Ok(LeafLookup::NonExistent);
1390                    }
1391
1392                    let child_nibble = full_path.get_unchecked(logical_end);
1393                    let Some(child_idx) = BranchChildIdx::new(b.state_mask, child_nibble) else {
1394                        return Ok(LeafLookup::NonExistent);
1395                    };
1396
1397                    match &b.children[child_idx] {
1398                        ArenaSparseNodeBranchChild::Blinded(rlp_node) => {
1399                            let hash = rlp_node
1400                                .as_hash()
1401                                .unwrap_or_else(|| keccak256(rlp_node.as_slice()));
1402                            let mut blinded_path = full_path.slice(..logical_end);
1403                            blinded_path.push_unchecked(child_nibble);
1404                            return Err(LeafLookupError::BlindedNode { path: blinded_path, hash });
1405                        }
1406                        ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1407                            current = *child_idx;
1408                            path_offset = logical_end + 1;
1409                        }
1410                    }
1411                }
1412                ArenaSparseNode::Subtrie(subtrie) => {
1413                    return Self::find_leaf_in_arena(
1414                        &subtrie.arena,
1415                        subtrie.root,
1416                        full_path,
1417                        path_offset,
1418                        expected_value,
1419                    );
1420                }
1421            }
1422        }
1423    }
1424
1425    /// Encodes a leaf node's RLP and pushes it onto `rlp_node_buf`. If the leaf is already
1426    /// cached, its existing `RlpNode` is reused.
1427    fn encode_leaf(
1428        arena: &mut NodeArena,
1429        idx: Index,
1430        rlp_buf: &mut Vec<u8>,
1431        rlp_node_buf: &mut Vec<RlpNode>,
1432    ) {
1433        let (key, value, state) = match &arena[idx] {
1434            ArenaSparseNode::Leaf { key, value, state } => (key, value, state),
1435            _ => unreachable!("encode_leaf called on non-Leaf node"),
1436        };
1437
1438        if let ArenaSparseNodeState::Cached { rlp_node, .. } = state {
1439            rlp_node_buf.push(rlp_node.clone());
1440            return;
1441        }
1442
1443        rlp_buf.clear();
1444        let rlp_node = LeafNodeRef { key, value }.rlp(rlp_buf);
1445
1446        *arena[idx].state_mut() = ArenaSparseNodeState::Cached { rlp_node: rlp_node.clone() };
1447        rlp_node_buf.push(rlp_node);
1448    }
1449
1450    /// Creates a new leaf and a new branch that splits an existing child from the new leaf at
1451    /// a divergence point. Returns the index of the new branch.
1452    ///
1453    /// `new_leaf_path` is the full remaining path for the new leaf (relative to the split
1454    /// point's parent).
1455    ///
1456    /// The old child's key (leaf) or `short_key` (branch) is truncated to the suffix after the
1457    /// divergence nibble and its state is set to dirty.
1458    ///
1459    /// The top of `stack` must be the leaf or branch being split. The top of stack will be the
1460    /// newly created branch once this returns.
1461    /// Returns `true` if the existing node was not already dirty (i.e., the split newly dirtied
1462    /// it).
1463    fn split_and_insert_leaf(
1464        arena: &mut NodeArena,
1465        cursor: &mut ArenaCursor,
1466        root: &mut Index,
1467        new_leaf_path: Nibbles,
1468        value: &[u8],
1469    ) -> bool {
1470        let old_child_entry = cursor.head().expect("cursor must have head");
1471        let old_child_idx = old_child_entry.index;
1472        let old_child_short_key = arena[old_child_idx].short_key().expect("top of stack is a leaf");
1473        let diverge_len = new_leaf_path.common_prefix_length(old_child_short_key);
1474
1475        trace!(
1476            target: TRACE_TARGET,
1477            path = ?old_child_entry.path,
1478            ?new_leaf_path,
1479            ?old_child_short_key,
1480            diverge_len,
1481            "Splitting node and inserting new leaf",
1482        );
1483
1484        let old_child_nibble = old_child_short_key.get_unchecked(diverge_len);
1485        let old_child_suffix = old_child_short_key.slice(diverge_len + 1..);
1486
1487        // Truncate the old child's key/short_key and mark it dirty.
1488        // Track whether the existing node was not already dirty (a leaf that becomes newly dirty).
1489        let newly_dirtied_existing = match &mut arena[old_child_idx] {
1490            ArenaSparseNode::Leaf { key, state, .. } => {
1491                *key = old_child_suffix;
1492                let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1493                *state = ArenaSparseNodeState::Dirty;
1494                was_clean
1495            }
1496            ArenaSparseNode::Branch(b) => {
1497                b.short_key = old_child_suffix;
1498                b.state = b.state.to_dirty();
1499                // Branches don't contribute to num_dirty_leaves.
1500                false
1501            }
1502            _ => unreachable!("split_and_insert_leaf called on non-Leaf/Branch node"),
1503        };
1504
1505        let short_key = new_leaf_path.slice(..diverge_len);
1506        let new_leaf_nibble = new_leaf_path.get_unchecked(diverge_len);
1507        debug_assert_ne!(old_child_nibble, new_leaf_nibble);
1508
1509        let new_leaf_idx = arena.insert(ArenaSparseNode::Leaf {
1510            state: ArenaSparseNodeState::Dirty,
1511            key: new_leaf_path.slice(diverge_len + 1..),
1512            value: value.to_vec(),
1513        });
1514
1515        let (first_nibble, first_child, second_nibble, second_child) =
1516            if old_child_nibble < new_leaf_nibble {
1517                (old_child_nibble, old_child_idx, new_leaf_nibble, new_leaf_idx)
1518            } else {
1519                (new_leaf_nibble, new_leaf_idx, old_child_nibble, old_child_idx)
1520            };
1521
1522        let state_mask = TrieMask::from(1u16 << first_nibble | 1u16 << second_nibble);
1523        let mut children = SmallVec::with_capacity(2);
1524        children.push(ArenaSparseNodeBranchChild::Revealed(first_child));
1525        children.push(ArenaSparseNodeBranchChild::Revealed(second_child));
1526
1527        let new_branch_idx = arena.insert(ArenaSparseNode::Branch(ArenaSparseNodeBranch {
1528            state: ArenaSparseNodeState::Dirty,
1529            children,
1530            state_mask,
1531            short_key,
1532            branch_masks: BranchNodeMasks::default(),
1533        }));
1534
1535        cursor.replace_head_index(arena, root, new_branch_idx);
1536        newly_dirtied_existing
1537    }
1538
1539    /// Performs a leaf upsert using a pre-computed [`SeekResult`] from
1540    /// [`ArenaCursor::seek`].
1541    ///
1542    /// Handles three cases based on `find_result`:
1543    /// 1. `RevealedLeaf` — the cursor head is a leaf; update in place or split into a branch.
1544    /// 2. Diverged — the path diverges within the branch's `short_key`, split it.
1545    /// 3. `NoChild` — the target nibble has no child, insert a new leaf.
1546    ///
1547    /// The caller must handle [`SeekResult::Blinded`] and
1548    /// [`SeekResult::RevealedSubtrie`] before calling this function.
1549    /// The cursor must be non-empty when called.
1550    ///
1551    /// Returns an [`UpsertLeafResult`] and [`SubtrieCounterDeltas`] so the caller can maintain
1552    /// aggregate counters and decide whether to wrap the result as a subtrie.
1553    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, fields(full_path = ?full_path))]
1554    fn upsert_leaf(
1555        arena: &mut NodeArena,
1556        cursor: &mut ArenaCursor,
1557        root: &mut Index,
1558        full_path: &Nibbles,
1559        value: &[u8],
1560        find_result: SeekResult,
1561    ) -> (UpsertLeafResult, SubtrieCounterDeltas) {
1562        trace!(target: TRACE_TARGET, ?find_result, "Upserting leaf");
1563        let head = cursor.head().expect("cursor is non-empty");
1564
1565        match find_result {
1566            SeekResult::Blinded => {
1567                unreachable!("Blinded case must be handled by caller")
1568            }
1569            SeekResult::EmptyRoot => {
1570                let head_idx = head.index;
1571                let head_path = head.path;
1572                arena[head_idx] = ArenaSparseNode::Leaf {
1573                    state: ArenaSparseNodeState::Dirty,
1574                    key: full_path.slice(head_path.len()..),
1575                    value: value.to_vec(),
1576                };
1577                (
1578                    UpsertLeafResult::NewLeaf,
1579                    SubtrieCounterDeltas { num_leaves_delta: 1, num_dirty_leaves_delta: 1 },
1580                )
1581            }
1582            SeekResult::RevealedLeaf => {
1583                // RevealedLeaf guarantees the leaf's full path matches the target exactly.
1584                let head_idx = head.index;
1585                let was_clean =
1586                    if let ArenaSparseNode::Leaf { value: v, state, .. } = &mut arena[head_idx] {
1587                        v.clear();
1588                        v.extend_from_slice(value);
1589                        let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1590                        *state = ArenaSparseNodeState::Dirty;
1591                        was_clean
1592                    } else {
1593                        unreachable!("RevealedLeaf but cursor head is not a leaf")
1594                    };
1595                (
1596                    UpsertLeafResult::Updated,
1597                    SubtrieCounterDeltas {
1598                        num_leaves_delta: 0,
1599                        num_dirty_leaves_delta: was_clean as i64,
1600                    },
1601                )
1602            }
1603            SeekResult::Diverged => {
1604                let head_path = head.path;
1605                let full_path_from_head = full_path.slice(head_path.len()..);
1606
1607                let split_dirtied_existing =
1608                    Self::split_and_insert_leaf(arena, cursor, root, full_path_from_head, value);
1609
1610                let result = if cursor.depth() >= 1 {
1611                    UpsertLeafResult::NewChild
1612                } else {
1613                    UpsertLeafResult::NewLeaf
1614                };
1615                (
1616                    result,
1617                    SubtrieCounterDeltas {
1618                        num_leaves_delta: 1,
1619                        num_dirty_leaves_delta: 1 + split_dirtied_existing as i64,
1620                    },
1621                )
1622            }
1623            SeekResult::NoChild { child_nibble } => {
1624                let head_idx = head.index;
1625
1626                let head_branch_logical_path = cursor.head_logical_branch_path(arena);
1627                let leaf_key = full_path.slice(head_branch_logical_path.len() + 1..);
1628                let new_leaf = arena.insert(ArenaSparseNode::Leaf {
1629                    state: ArenaSparseNodeState::Dirty,
1630                    key: leaf_key,
1631                    value: value.to_vec(),
1632                });
1633
1634                let branch = arena[head_idx].branch_mut();
1635                branch.set_child(child_nibble, ArenaSparseNodeBranchChild::Revealed(new_leaf));
1636
1637                // Re-seek to position the cursor on the newly inserted leaf.
1638                cursor.seek(arena, full_path);
1639
1640                (
1641                    UpsertLeafResult::NewChild,
1642                    SubtrieCounterDeltas { num_leaves_delta: 1, num_dirty_leaves_delta: 1 },
1643                )
1644            }
1645            SeekResult::RevealedSubtrie => {
1646                unreachable!("RevealedSubtrie must be handled by caller")
1647            }
1648        }
1649    }
1650
1651    /// Removes a leaf node from the trie using a pre-computed [`SeekResult`] from
1652    /// [`ArenaCursor::seek`].
1653    ///
1654    /// Only the `RevealedLeaf` case performs a removal — the leaf must exist and its full path
1655    /// must match `full_path`. All other cases (`Diverged`, `NoChild`) are no-ops since the leaf
1656    /// doesn't exist at that path.
1657    ///
1658    /// When removing a leaf from a branch, if the branch is left with only one remaining child,
1659    /// the branch is collapsed: the remaining child absorbs the branch's `short_key` + the child's
1660    /// nibble as a prefix to its own key/`short_key`, and replaces the branch in the parent.
1661    /// If the remaining child is blinded, the collapse cannot proceed and a
1662    /// [`RemoveLeafResult::NeedsProof`] is returned so the caller can request a proof.
1663    ///
1664    /// The caller must handle [`SeekResult::Blinded`] and
1665    /// [`SeekResult::RevealedSubtrie`] before calling this function.
1666    fn remove_leaf(
1667        arena: &mut NodeArena,
1668        cursor: &mut ArenaCursor,
1669        root: &mut Index,
1670        key: B256,
1671        full_path: &Nibbles,
1672        find_result: SeekResult,
1673        updates: &mut Option<SparseTrieUpdates>,
1674    ) -> (RemoveLeafResult, SubtrieCounterDeltas) {
1675        match find_result {
1676            SeekResult::Blinded | SeekResult::RevealedSubtrie => {
1677                unreachable!("Blinded/RevealedSubtrie must be handled by caller")
1678            }
1679            SeekResult::EmptyRoot | SeekResult::Diverged | SeekResult::NoChild { .. } => {
1680                (RemoveLeafResult::NotFound, SubtrieCounterDeltas::default())
1681            }
1682            SeekResult::RevealedLeaf => {
1683                // RevealedLeaf guarantees the leaf's full path matches the target exactly.
1684                let head = cursor.head().expect("cursor is non-empty");
1685                let head_idx = head.index;
1686                let head_path = head.path;
1687
1688                trace!(
1689                    target: TRACE_TARGET,
1690                    path = ?head_path,
1691                    ?full_path,
1692                    "Removing leaf",
1693                );
1694
1695                // Before mutating, check if removing this leaf would leave the parent
1696                // branch with a single blinded sibling (requiring a proof to collapse).
1697                if let Some(parent_entry) = cursor.parent() {
1698                    let parent_idx = parent_entry.index;
1699                    let child_nibble = head_path.last().expect("non-root leaf");
1700                    let parent_branch = arena[parent_idx].branch_ref();
1701
1702                    if parent_branch.state_mask.count_bits() == 2 &&
1703                        parent_branch.sibling_child(child_nibble).is_blinded()
1704                    {
1705                        let sibling_nibble = parent_branch
1706                            .state_mask
1707                            .iter()
1708                            .find(|&n| n != child_nibble)
1709                            .expect("branch has two children");
1710                        let mut sibling_path = cursor.parent_logical_branch_path(arena);
1711                        sibling_path.push_unchecked(sibling_nibble);
1712                        trace!(target: TRACE_TARGET, ?full_path, ?sibling_path, "Removal would collapse branch onto blinded sibling, requesting proof");
1713                        return (
1714                            RemoveLeafResult::NeedsProof {
1715                                key,
1716                                proof_key: Self::nibbles_to_padded_b256(&sibling_path),
1717                                min_len: (sibling_path.len() as u8).min(64),
1718                            },
1719                            SubtrieCounterDeltas::default(),
1720                        );
1721                    }
1722                }
1723
1724                // Check if the removed leaf was dirty before removing it.
1725                let removed_was_dirty =
1726                    matches!(arena[head_idx].state_ref(), Some(ArenaSparseNodeState::Dirty));
1727
1728                if cursor.depth() == 0 {
1729                    // The leaf is the root — replace with EmptyRoot and reset the cursor
1730                    // so subsequent iterations can call seek normally.
1731                    arena.remove(head_idx);
1732                    *root = arena.insert(ArenaSparseNode::EmptyRoot);
1733                    cursor.reset(arena, *root, head_path);
1734                    return (
1735                        RemoveLeafResult::Removed,
1736                        SubtrieCounterDeltas {
1737                            num_leaves_delta: -1,
1738                            num_dirty_leaves_delta: -(removed_was_dirty as i64),
1739                        },
1740                    );
1741                }
1742
1743                // Pop the leaf entry, propagating dirty state to the parent.
1744                cursor.pop(arena);
1745
1746                // The parent must be a branch. Remove the leaf from it.
1747                let parent_entry = cursor.head().expect("cursor is non-empty");
1748                let parent_idx = parent_entry.index;
1749                let child_nibble = head_path.last().expect("non-root leaf");
1750
1751                // Remove the leaf from the arena and from the parent's children.
1752                arena.remove(head_idx);
1753                let parent_branch = arena[parent_idx].branch_mut();
1754                parent_branch.remove_child(child_nibble);
1755
1756                // If the branch now has only one child, collapse it. The blinded sibling
1757                // case was already handled above before any mutations.
1758                let collapse_dirtied_leaf = if parent_branch.state_mask.count_bits() == 1 {
1759                    Self::collapse_branch(arena, cursor, root, updates)
1760                } else {
1761                    false
1762                };
1763                (
1764                    RemoveLeafResult::Removed,
1765                    SubtrieCounterDeltas {
1766                        num_leaves_delta: -1,
1767                        num_dirty_leaves_delta: (collapse_dirtied_leaf as i64) -
1768                            (removed_was_dirty as i64),
1769                    },
1770                )
1771            }
1772        }
1773    }
1774
1775    /// Checks whether a subtrie receiving only removals would cause its parent branch to collapse
1776    /// onto a single blinded sibling. If so, returns the proof needed to reveal that blinded
1777    /// sibling so the caller can request it and skip the subtrie's updates.
1778    ///
1779    /// Returns `Some(proof)` for the blinded sibling when the edge-case applies, `None` otherwise.
1780    fn check_subtrie_collapse_needs_proof(
1781        arena: &NodeArena,
1782        cursor: &ArenaCursor,
1783        subtrie_updates: &[(B256, Nibbles, LeafUpdate)],
1784    ) -> Option<ArenaRequiredProof> {
1785        let num_removals = subtrie_updates
1786            .iter()
1787            .filter(|(_, _, u)| matches!(u, LeafUpdate::Changed(v) if v.is_empty()))
1788            .count() as u64;
1789
1790        // Touched is a no-op that doesn't alter trie structure, so it must be
1791        // excluded when deciding whether "all updates are removals". This mirrors
1792        // the `all_removals` / `might_empty_subtrie` filter in `update_leaves`.
1793        // Without this, a batch of removals + Touched entries
1794        // would fail the `num_removals != num_changed` check, skip the proof
1795        // request for the blinded sibling, and later panic in
1796        // `maybe_collapse_or_remove_branch` when the subtrie empties inline.
1797        let num_changed =
1798            subtrie_updates.iter().filter(|(_, _, u)| matches!(u, LeafUpdate::Changed(_))).count()
1799                as u64;
1800
1801        if num_removals == 0 || num_removals != num_changed {
1802            return None;
1803        }
1804
1805        // The subtrie is the cursor head; its parent is the cursor's parent.
1806        let subtrie_entry = cursor.head()?;
1807        let subtrie_num_leaves = match &arena[subtrie_entry.index] {
1808            ArenaSparseNode::Subtrie(s) => s.num_leaves,
1809            _ => return None,
1810        };
1811        if num_removals < subtrie_num_leaves {
1812            return None;
1813        }
1814
1815        let child_nibble =
1816            subtrie_entry.path.last().expect("subtrie path must have at least one nibble");
1817
1818        let parent_entry = cursor.parent()?;
1819        let parent_branch = arena[parent_entry.index].branch_ref();
1820        if parent_branch.state_mask.count_bits() != 2 {
1821            return None;
1822        }
1823
1824        if !parent_branch.sibling_child(child_nibble).is_blinded() {
1825            return None;
1826        }
1827
1828        let sibling_nibble = parent_branch
1829            .state_mask
1830            .iter()
1831            .find(|&n| n != child_nibble)
1832            .expect("branch has two children");
1833        let mut sibling_path = cursor.parent_logical_branch_path(arena);
1834        sibling_path.push_unchecked(sibling_nibble);
1835
1836        Some(ArenaRequiredProof {
1837            key: Self::nibbles_to_padded_b256(&sibling_path),
1838            min_len: (sibling_path.len() as u8).min(64),
1839        })
1840    }
1841
1842    /// Collapses a branch node that has exactly one remaining revealed child. The branch's
1843    /// `short_key`, the remaining child's nibble, and the child's own key/`short_key` are
1844    /// concatenated to form the child's new key/`short_key`. The child then replaces the branch
1845    /// in the grandparent (or becomes the new root).
1846    ///
1847    /// The caller must verify that the remaining child is not blinded before calling this function.
1848    ///
1849    /// The branch being collapsed must be the current cursor head. The cursor head will be
1850    /// replaced with the remaining child which has taken its place.
1851    /// Returns `true` if the collapse dirtied a surviving leaf that was not already dirty.
1852    fn collapse_branch(
1853        arena: &mut NodeArena,
1854        cursor: &mut ArenaCursor,
1855        root: &mut Index,
1856        updates: &mut Option<SparseTrieUpdates>,
1857    ) -> bool {
1858        let branch_entry = cursor.head().expect("cursor is non-empty");
1859        let branch_idx = branch_entry.index;
1860        let branch = arena[branch_idx].branch_ref();
1861        let remaining_nibble =
1862            branch.state_mask.iter().next().expect("branch has at least one child");
1863        let branch_short_key = branch.short_key;
1864
1865        debug_assert_eq!(
1866            branch.state_mask.count_bits(),
1867            1,
1868            "collapse_branch requires exactly 1 child"
1869        );
1870        debug_assert!(
1871            !branch.children[0].is_blinded(),
1872            "collapse_branch called with a blinded remaining child"
1873        );
1874
1875        trace!(
1876            target: TRACE_TARGET,
1877            path = ?branch_entry.path,
1878            short_key = ?branch_short_key,
1879            branch_masks = ?branch.branch_masks,
1880            ?remaining_nibble,
1881            "Collapsing single-child branch",
1882        );
1883
1884        // Record the collapsed branch's logical path for trie update tracking if it
1885        // was previously persisted in the DB trie.
1886        if let Some(trie_updates) = updates.as_mut() &&
1887            !branch.branch_masks.is_empty()
1888        {
1889            let logical_path = cursor.head_logical_branch_path(arena);
1890            if !logical_path.is_empty() {
1891                trie_updates.updated_nodes.remove(&logical_path);
1892                trie_updates.removed_nodes.insert(logical_path);
1893            }
1894        }
1895
1896        // Build the prefix: branch's short_key + remaining child's nibble.
1897        let mut prefix = branch_short_key;
1898        prefix.push_unchecked(remaining_nibble);
1899
1900        let ArenaSparseNodeBranchChild::Revealed(child_idx) = branch.children[0] else {
1901            unreachable!()
1902        };
1903
1904        // Prepend the prefix to the child's key/short_key and mark dirty.
1905        // Track whether a leaf was newly dirtied by this collapse.
1906        let newly_dirtied_leaf = match &mut arena[child_idx] {
1907            ArenaSparseNode::Leaf { key, state, .. } => {
1908                let mut new_key = prefix;
1909                new_key.extend(key);
1910                *key = new_key;
1911                let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1912                *state = ArenaSparseNodeState::Dirty;
1913                was_clean
1914            }
1915            ArenaSparseNode::Branch(b) => {
1916                let mut new_short_key = prefix;
1917                new_short_key.extend(&b.short_key);
1918                b.short_key = new_short_key;
1919                b.state = b.state.to_dirty();
1920                false
1921            }
1922            ArenaSparseNode::Subtrie(subtrie) => {
1923                subtrie.path = branch_entry.path;
1924                match &mut subtrie.arena[subtrie.root] {
1925                    ArenaSparseNode::Branch(b) => {
1926                        let mut new_short_key = prefix;
1927                        new_short_key.extend(&b.short_key);
1928                        b.short_key = new_short_key;
1929                        b.state = b.state.to_dirty();
1930                    }
1931                    ArenaSparseNode::Leaf { key, state, .. } => {
1932                        let mut new_key = prefix;
1933                        new_key.extend(key);
1934                        *key = new_key;
1935                        let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1936                        *state = ArenaSparseNodeState::Dirty;
1937                        if was_clean {
1938                            subtrie.num_dirty_leaves += 1;
1939                        }
1940                    }
1941                    _ => {
1942                        unreachable!("subtrie root must be a Branch or Leaf during collapse_branch")
1943                    }
1944                }
1945                false
1946            }
1947            _ => unreachable!("remaining child must be Leaf, Branch, or Subtrie"),
1948        };
1949
1950        // Replace the branch with the remaining child in the grandparent (or root).
1951        cursor.replace_head_index(arena, root, child_idx);
1952
1953        // Free the collapsed branch.
1954        arena.remove(branch_idx);
1955        newly_dirtied_leaf
1956    }
1957
1958    /// Counts the total leaves and dirty leaves in a subtree rooted at `idx`.
1959    fn count_leaves_and_dirty(arena: &NodeArena, idx: Index) -> (u64, u64) {
1960        match &arena[idx] {
1961            ArenaSparseNode::Leaf { state, .. } => {
1962                let dirty = matches!(state, ArenaSparseNodeState::Dirty) as u64;
1963                (1, dirty)
1964            }
1965            ArenaSparseNode::Branch(b) => {
1966                let mut leaves = 0u64;
1967                let mut dirty = 0u64;
1968                for c in &b.children {
1969                    if let ArenaSparseNodeBranchChild::Revealed(child_idx) = c {
1970                        let (l, d) = Self::count_leaves_and_dirty(arena, *child_idx);
1971                        leaves += l;
1972                        dirty += d;
1973                    }
1974                }
1975                (leaves, dirty)
1976            }
1977            _ => (0, 0),
1978        }
1979    }
1980
1981    /// Asserts that every node in the upper arena satisfies the subtrie structure invariant:
1982    /// - Nodes at `UPPER_TRIE_MAX_DEPTH` path length must be `Subtrie` (or `TakenSubtrie`).
1983    /// - Nodes at other depths must NOT be `Subtrie`.
1984    ///
1985    /// Uses the cursor to DFS the upper arena, checking each visited node's path length.
1986    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
1987    #[cfg(debug_assertions)]
1988    fn debug_assert_subtrie_structure(&mut self) {
1989        let mut cursor = mem::take(&mut self.buffers.cursor);
1990        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
1991
1992        loop {
1993            let result = cursor.next(&mut self.upper_arena, |_, _| true);
1994            match result {
1995                NextResult::Done => break,
1996                NextResult::NonBranch | NextResult::Branch => {
1997                    let head = cursor.head().expect("cursor is non-empty");
1998                    let path_len = head.path.len();
1999                    let node = &self.upper_arena[head.index];
2000
2001                    if Self::should_be_subtrie(path_len) {
2002                        debug_assert!(
2003                            matches!(
2004                                node,
2005                                ArenaSparseNode::Subtrie(_) | ArenaSparseNode::TakenSubtrie
2006                            ),
2007                            "node at path_len={path_len} should be a Subtrie but is {node:?}",
2008                        );
2009                    } else {
2010                        debug_assert!(
2011                            !matches!(node, ArenaSparseNode::Subtrie(_)),
2012                            "node at path_len={path_len} should NOT be a Subtrie but is",
2013                        );
2014                    }
2015                }
2016            }
2017        }
2018
2019        self.buffers.cursor = cursor;
2020    }
2021
2022    /// Recursively migrates all nodes from `src` into `dst`, starting at `src_idx`.
2023    /// Branch children's `Revealed` indices are remapped to the new `dst` indices during
2024    /// the migration.
2025    ///
2026    /// If `dst_slot` is `Some(idx)`, the node at `src_idx` is placed into `dst[idx]`
2027    /// (overwriting); otherwise a new slot is allocated. Returns the `dst` index of the
2028    /// migrated node.
2029    fn migrate_nodes(
2030        dst: &mut NodeArena,
2031        src: &mut NodeArena,
2032        src_idx: Index,
2033        dst_slot: Option<Index>,
2034    ) -> Index {
2035        let mut node = src.remove(src_idx).expect("node exists in source arena");
2036
2037        // Recursively migrate children first so their new indices are known.
2038        if let ArenaSparseNode::Branch(b) = &mut node {
2039            for child in &mut b.children {
2040                if let ArenaSparseNodeBranchChild::Revealed(child_idx) = child {
2041                    *child_idx = Self::migrate_nodes(dst, src, *child_idx, None);
2042                }
2043            }
2044        }
2045
2046        if let Some(slot) = dst_slot {
2047            dst[slot] = node;
2048            slot
2049        } else {
2050            dst.insert(node)
2051        }
2052    }
2053
2054    /// Removes a pruned node from the arena and blinds the parent's child slot with `cached_rlp`.
2055    ///
2056    /// Callers must only invoke this for nodes whose cached RLP is available.
2057    fn remove_pruned_node(
2058        arena: &mut NodeArena,
2059        cursor: &ArenaCursor,
2060        idx: Index,
2061        nibble: Option<u8>,
2062        cached_rlp: RlpNode,
2063    ) -> ArenaSparseNode {
2064        trace!(target: TRACE_TARGET, path = ?cursor.head().unwrap().path, "pruning node");
2065
2066        {
2067            let parent_idx = cursor.parent().expect("pruned child has parent").index;
2068            let child_nibble = nibble.expect("non-root child");
2069            let parent_branch = arena[parent_idx].branch_mut();
2070            let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
2071                .expect("child nibble not found in parent state_mask");
2072            debug_assert!(
2073                matches!(
2074                    parent_branch.children[child_idx],
2075                    ArenaSparseNodeBranchChild::Revealed(revealed_idx) if revealed_idx == idx
2076                ),
2077                "parent child slot at nibble {child_nibble} does not point to pruned node"
2078            );
2079            parent_branch.children[child_idx] = ArenaSparseNodeBranchChild::Blinded(cached_rlp);
2080        }
2081
2082        arena.remove(idx).expect("node must exist to be pruned")
2083    }
2084
2085    /// Reveals a single proof node using a pre-computed [`SeekResult`] from
2086    /// [`ArenaCursor::seek`].
2087    ///
2088    /// If the result is `Blinded`, the blinded child is replaced with the proof node (converted to
2089    /// an arena node with `Cached` state). All other cases (already revealed, no child, diverged,
2090    /// leaf head) are no-ops — the proof node is skipped.
2091    ///
2092    /// Returns the `Index` of the revealed node in the arena, if any was revealed.
2093    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2094    fn reveal_node(
2095        arena: &mut NodeArena,
2096        cursor: &ArenaCursor,
2097        node: &mut ProofTrieNodeV2,
2098        find_result: SeekResult,
2099    ) -> Option<Index> {
2100        let SeekResult::Blinded = find_result else {
2101            // Already revealed, no child slot, or diverged — skip this proof node.
2102            return None;
2103        };
2104
2105        let head = cursor.head().expect("cursor is non-empty");
2106        let head_idx = head.index;
2107        let head_branch_logical_path = cursor.head_logical_branch_path(arena);
2108
2109        debug_assert_eq!(
2110            node.path.len(),
2111            head_branch_logical_path.len() + 1,
2112            "proof node path {:?} is not a direct child of branch at {:?} (expected depth {})",
2113            node.path,
2114            head_branch_logical_path,
2115            head_branch_logical_path.len() + 1,
2116        );
2117
2118        let child_nibble = node.path.get_unchecked(head_branch_logical_path.len());
2119        let head_branch = arena[head_idx].branch_ref();
2120        let dense_child_idx = BranchChildIdx::new(head_branch.state_mask, child_nibble)
2121            .expect("Blinded result but child nibble not in state_mask");
2122
2123        let cached_rlp = match &head_branch.children[dense_child_idx] {
2124            ArenaSparseNodeBranchChild::Blinded(rlp) => rlp.clone(),
2125            ArenaSparseNodeBranchChild::Revealed(_) => return None,
2126        };
2127
2128        trace!(
2129            target: TRACE_TARGET,
2130            path = ?node.path,
2131            rlp_node = ?cached_rlp,
2132            "Revealing node",
2133        );
2134
2135        let proof_node = mem::replace(node, ProofTrieNodeV2::empty());
2136        let mut arena_node = ArenaSparseNode::from_proof_node(proof_node);
2137
2138        let state = arena_node.state_mut();
2139        *state = ArenaSparseNodeState::Cached { rlp_node: cached_rlp };
2140
2141        let child_idx = arena.insert(arena_node);
2142        arena[head_idx].branch_mut().children[dense_child_idx] =
2143            ArenaSparseNodeBranchChild::Revealed(child_idx);
2144
2145        Some(child_idx)
2146    }
2147
2148    #[cfg(debug_assertions)]
2149    fn collect_reachable_nodes(arena: &NodeArena, idx: Index, reachable: &mut HashSet<Index>) {
2150        if !reachable.insert(idx) {
2151            return;
2152        }
2153        if let ArenaSparseNode::Branch(b) = &arena[idx] {
2154            for child in &b.children {
2155                if let ArenaSparseNodeBranchChild::Revealed(child_idx) = child {
2156                    Self::collect_reachable_nodes(arena, *child_idx, reachable);
2157                }
2158            }
2159        }
2160    }
2161
2162    #[cfg(debug_assertions)]
2163    fn assert_no_orphaned_nodes(arena: &NodeArena, root: Index, label: &str) {
2164        let mut reachable = HashSet::default();
2165        Self::collect_reachable_nodes(arena, root, &mut reachable);
2166        let all_indices: HashSet<Index> = arena.iter().map(|(idx, _)| idx).collect();
2167        let orphaned: Vec<_> = all_indices.difference(&reachable).collect();
2168        debug_assert!(
2169            orphaned.is_empty(),
2170            "{label} has {} orphaned node(s): {orphaned:?}",
2171            orphaned.len(),
2172        );
2173    }
2174}
2175
2176#[cfg(debug_assertions)]
2177impl Drop for ArenaParallelSparseTrie {
2178    fn drop(&mut self) {
2179        Self::assert_no_orphaned_nodes(&self.upper_arena, self.root, "upper arena");
2180
2181        for (_, node) in &self.upper_arena {
2182            if let Some(subtrie) = node.as_subtrie() {
2183                Self::assert_no_orphaned_nodes(
2184                    &subtrie.arena,
2185                    subtrie.root,
2186                    &alloc::format!("subtrie {:?}", subtrie.path),
2187                );
2188            }
2189        }
2190    }
2191}
2192
2193impl Default for ArenaParallelSparseTrie {
2194    fn default() -> Self {
2195        let mut upper_arena = SlotMap::new();
2196        let root = upper_arena.insert(ArenaSparseNode::EmptyRoot);
2197        Self {
2198            upper_arena,
2199            root,
2200            updates: None,
2201            buffers: ArenaTrieBuffers::default(),
2202            parallelism_thresholds: ArenaParallelismThresholds::default(),
2203            #[cfg(feature = "trie-debug")]
2204            debug_recorder: Default::default(),
2205        }
2206    }
2207}
2208
2209impl ArenaParallelSparseTrie {
2210    /// Hashes a subtrie at `head_idx` and collects its update actions.
2211    fn update_upper_subtrie(&mut self, head_idx: Index) {
2212        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[head_idx] else {
2213            unreachable!()
2214        };
2215
2216        if !subtrie.arena[subtrie.root].is_cached() {
2217            subtrie.update_cached_rlp();
2218        }
2219
2220        Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
2221    }
2222}
2223
2224impl SparseTrie for ArenaParallelSparseTrie {
2225    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2226    fn set_root(
2227        &mut self,
2228        root: TrieNodeV2,
2229        masks: Option<BranchNodeMasks>,
2230        retain_updates: bool,
2231    ) -> SparseTrieResult<()> {
2232        #[cfg(feature = "trie-debug")]
2233        self.debug_recorder.record(RecordedOp::SetRoot {
2234            node: ProofTrieNodeRecord::from_proof_trie_node_v2(&ProofTrieNodeV2 {
2235                path: Nibbles::default(),
2236                node: root.clone(),
2237                masks,
2238            }),
2239        });
2240
2241        debug_assert!(
2242            matches!(self.upper_arena[self.root], ArenaSparseNode::EmptyRoot),
2243            "set_root called on a trie that already has revealed nodes"
2244        );
2245
2246        self.set_updates(retain_updates);
2247
2248        match root {
2249            TrieNodeV2::EmptyRoot => {
2250                trace!(target: TRACE_TARGET, "Setting empty root");
2251            }
2252            TrieNodeV2::Leaf(leaf) => {
2253                trace!(target: TRACE_TARGET, key = ?leaf.key, "Setting leaf root");
2254                self.upper_arena[self.root] = ArenaSparseNode::Leaf {
2255                    state: ArenaSparseNodeState::Revealed,
2256                    key: leaf.key,
2257                    value: leaf.value,
2258                };
2259            }
2260            TrieNodeV2::Branch(branch) => {
2261                trace!(target: TRACE_TARGET, state_mask = ?branch.state_mask, num_children = branch.state_mask.count_bits(), "Setting branch root");
2262                let mut children = SmallVec::with_capacity(branch.state_mask.count_bits() as usize);
2263                for (stack_ptr, _nibble) in branch.state_mask.iter().enumerate() {
2264                    children
2265                        .push(ArenaSparseNodeBranchChild::Blinded(branch.stack[stack_ptr].clone()));
2266                }
2267
2268                self.upper_arena[self.root] = ArenaSparseNode::Branch(ArenaSparseNodeBranch {
2269                    state: ArenaSparseNodeState::Revealed,
2270                    children,
2271                    state_mask: branch.state_mask,
2272                    short_key: branch.key,
2273                    branch_masks: masks.unwrap_or_default(),
2274                });
2275            }
2276            TrieNodeV2::Extension(_) => {
2277                panic!("set_root does not support Extension nodes; extensions are represented as branches with a short_key")
2278            }
2279        }
2280
2281        Ok(())
2282    }
2283
2284    fn set_updates(&mut self, retain_updates: bool) {
2285        self.updates = retain_updates.then(Default::default);
2286        if retain_updates {
2287            self.buffers.updates.get_or_insert_with(SparseTrieUpdates::default).clear();
2288        } else {
2289            self.buffers.updates = None;
2290        }
2291    }
2292
2293    fn reserve_nodes(&mut self, additional: usize) {
2294        self.upper_arena.reserve(additional);
2295    }
2296
2297    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, fields(num_nodes = nodes.len()))]
2298    fn reveal_nodes(&mut self, nodes: &mut [ProofTrieNodeV2]) -> SparseTrieResult<()> {
2299        if nodes.is_empty() {
2300            return Ok(());
2301        }
2302
2303        #[cfg(feature = "trie-debug")]
2304        self.debug_recorder.record(RecordedOp::RevealNodes {
2305            nodes: nodes.iter().map(ProofTrieNodeRecord::from_proof_trie_node_v2).collect(),
2306        });
2307
2308        if matches!(self.upper_arena[self.root], ArenaSparseNode::EmptyRoot) {
2309            trace!(target: TRACE_TARGET, "Skipping reveal_nodes on empty root");
2310            return Ok(());
2311        }
2312
2313        // Sort nodes lexicographically by path.
2314        nodes.sort_unstable_by_key(|n| n.path);
2315
2316        let threshold = self.parallelism_thresholds.min_revealed_nodes;
2317
2318        // Take the cursor out to avoid borrow conflicts with `self`.
2319        let mut cursor = mem::take(&mut self.buffers.cursor);
2320        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2321
2322        // Skip root node if present (set_root handles the root).
2323        let mut node_idx = if nodes[0].path.is_empty() { 1 } else { 0 };
2324
2325        // Walk the upper trie, revealing upper nodes inline and collecting subtrie work.
2326        // Subtries with enough nodes to reveal are taken for parallel processing; the rest
2327        // are revealed inline.
2328        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>, Vec<ProofTrieNodeV2>)> = Vec::new();
2329
2330        while node_idx < nodes.len() {
2331            let find_result = cursor.seek(&mut self.upper_arena, &nodes[node_idx].path);
2332
2333            match find_result {
2334                SeekResult::RevealedLeaf => {
2335                    trace!(target: TRACE_TARGET, path = ?nodes[node_idx].path, "Skipping reveal: leaf head");
2336                    node_idx += 1;
2337                }
2338                SeekResult::Blinded => {
2339                    // Save the proof node's path before reveal_node consumes it.
2340                    let child_path = nodes[node_idx].path;
2341                    let child_idx = Self::reveal_node(
2342                        &mut self.upper_arena,
2343                        &cursor,
2344                        &mut nodes[node_idx],
2345                        SeekResult::Blinded,
2346                    );
2347                    node_idx += 1;
2348
2349                    if let Some(child_idx) = child_idx {
2350                        self.maybe_wrap_in_subtrie(child_idx, &child_path);
2351                    }
2352                }
2353                SeekResult::RevealedSubtrie => {
2354                    let subtrie_entry = cursor.head().expect("cursor is non-empty");
2355                    let child_idx = subtrie_entry.index;
2356                    let prefix = subtrie_entry.path;
2357
2358                    let subtrie_start = node_idx;
2359                    while node_idx < nodes.len() && nodes[node_idx].path.starts_with(&prefix) {
2360                        node_idx += 1;
2361                    }
2362                    let num_subtrie_nodes = node_idx - subtrie_start;
2363
2364                    if num_subtrie_nodes >= threshold {
2365                        // Take subtrie for parallel reveal.
2366                        trace!(target: TRACE_TARGET, ?prefix, num_subtrie_nodes, "Taking subtrie for parallel reveal");
2367                        let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
2368                            &mut self.upper_arena[child_idx],
2369                            ArenaSparseNode::TakenSubtrie,
2370                        ) else {
2371                            unreachable!("RevealedSubtrie must point to a Subtrie node")
2372                        };
2373                        let node_vec: Vec<ProofTrieNodeV2> = (subtrie_start..node_idx)
2374                            .map(|i| mem::replace(&mut nodes[i], ProofTrieNodeV2::empty()))
2375                            .collect();
2376                        taken.push((child_idx, subtrie, node_vec));
2377                    } else {
2378                        // Reveal inline.
2379                        trace!(target: TRACE_TARGET, ?prefix, num_subtrie_nodes, "Revealing subtrie inline");
2380                        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[child_idx]
2381                        else {
2382                            unreachable!("RevealedSubtrie must point to a Subtrie node")
2383                        };
2384                        let mut subtrie_nodes: Vec<ProofTrieNodeV2> = (subtrie_start..node_idx)
2385                            .map(|i| mem::replace(&mut nodes[i], ProofTrieNodeV2::empty()))
2386                            .collect();
2387                        subtrie.reveal_nodes(&mut subtrie_nodes)?;
2388                    }
2389                }
2390                _ => {
2391                    trace!(target: TRACE_TARGET, path = ?nodes[node_idx].path, ?find_result, "Skipping reveal: no blinded child");
2392                    node_idx += 1;
2393                }
2394            }
2395        }
2396
2397        // Drain remaining cursor entries from the upper-trie walk.
2398        cursor.drain(&mut self.upper_arena);
2399        self.buffers.cursor = cursor;
2400
2401        if taken.is_empty() {
2402            return Ok(());
2403        }
2404
2405        // Reveal taken subtries, in parallel if more than one.
2406        if taken.len() == 1 {
2407            let (_, subtrie, node_vec) = &mut taken[0];
2408            subtrie.reveal_nodes(node_vec)?;
2409        } else {
2410            use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
2411
2412            let results: Vec<SparseTrieResult<()>> = taken
2413                .par_iter_mut()
2414                .map(|(_, subtrie, node_vec)| subtrie.reveal_nodes(node_vec))
2415                .collect();
2416
2417            if let Some(err) = results.into_iter().find(|r| r.is_err()) {
2418                // Restore before returning so we don't leave TakenSubtrie holes.
2419                for (idx, subtrie, _) in taken {
2420                    self.upper_arena[idx] = ArenaSparseNode::Subtrie(subtrie);
2421                }
2422                return err;
2423            }
2424        }
2425
2426        // Restore taken subtries into the upper arena.
2427        for (idx, subtrie, _) in taken {
2428            self.upper_arena[idx] = ArenaSparseNode::Subtrie(subtrie);
2429        }
2430
2431        #[cfg(debug_assertions)]
2432        self.debug_assert_subtrie_structure();
2433
2434        Ok(())
2435    }
2436
2437    fn update_leaf<P: crate::provider::TrieNodeProvider>(
2438        &mut self,
2439        _full_path: Nibbles,
2440        _value: Vec<u8>,
2441        _provider: P,
2442    ) -> SparseTrieResult<()> {
2443        unimplemented!("ArenaParallelSparseTrie uses update_leaves for batch leaf updates")
2444    }
2445
2446    fn remove_leaf<P: crate::provider::TrieNodeProvider>(
2447        &mut self,
2448        _full_path: &Nibbles,
2449        _provider: P,
2450    ) -> SparseTrieResult<()> {
2451        unimplemented!("ArenaParallelSparseTrie uses update_leaves for batch leaf removals")
2452    }
2453
2454    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, ret)]
2455    fn root(&mut self) -> B256 {
2456        #[cfg(feature = "trie-debug")]
2457        self.debug_recorder.record(RecordedOp::Root);
2458
2459        self.update_subtrie_hashes();
2460
2461        // Merge buffered subtrie updates into self.updates before hashing the upper trie,
2462        // which will add its own updates directly into self.updates.
2463        Self::merge_subtrie_updates(&mut self.updates, &mut self.buffers.updates);
2464
2465        let rlp_node = Self::update_cached_rlp(
2466            &mut self.upper_arena,
2467            self.root,
2468            &mut self.buffers.cursor,
2469            &mut self.buffers.rlp_buf,
2470            &mut self.buffers.rlp_node_buf,
2471            Nibbles::default(),
2472            &mut self.updates,
2473        );
2474
2475        rlp_node.as_hash().expect("root RlpNode must be a hash")
2476    }
2477
2478    fn is_root_cached(&self) -> bool {
2479        self.upper_arena[self.root].is_cached()
2480    }
2481
2482    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2483    fn update_subtrie_hashes(&mut self) {
2484        #[cfg(feature = "trie-debug")]
2485        self.debug_recorder.record(RecordedOp::UpdateSubtrieHashes);
2486
2487        trace!(target: TRACE_TARGET, "Updating subtrie hashes");
2488
2489        // Only descend if the root is a branch; otherwise there are no subtries.
2490        if !matches!(&self.upper_arena[self.root], ArenaSparseNode::Branch(_)) {
2491            return;
2492        }
2493
2494        // Count total dirty leaves across all subtries to make a global parallelism
2495        // decision, matching the approach in ParallelSparseTrie.
2496        let mut total_dirty_leaves: u64 = 0;
2497        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>)> = Vec::new();
2498        for (idx, node) in &mut self.upper_arena {
2499            let ArenaSparseNode::Subtrie(s) = node else { continue };
2500            if s.num_dirty_leaves == 0 {
2501                continue;
2502            }
2503            total_dirty_leaves += s.num_dirty_leaves;
2504            let ArenaSparseNode::Subtrie(subtrie) =
2505                mem::replace(node, ArenaSparseNode::TakenSubtrie)
2506            else {
2507                unreachable!()
2508            };
2509            taken.push((idx, subtrie));
2510        }
2511
2512        // Hash taken subtries: in parallel if total dirty leaves meet the threshold,
2513        // otherwise serially. This mirrors ParallelSparseTrie's all-or-nothing approach.
2514        if !taken.is_empty() {
2515            if taken.len() == 1 || total_dirty_leaves < self.parallelism_thresholds.min_dirty_leaves
2516            {
2517                for (_, subtrie) in &mut taken {
2518                    subtrie.update_cached_rlp();
2519                }
2520            } else {
2521                use rayon::iter::{IntoParallelIterator, ParallelIterator};
2522
2523                taken = taken
2524                    .into_par_iter()
2525                    .map(|(idx, mut subtrie)| {
2526                        subtrie.update_cached_rlp();
2527                        (idx, subtrie)
2528                    })
2529                    .collect();
2530            }
2531        }
2532
2533        // If the root branch is already cached and nothing was taken for parallel
2534        // hashing, there are no dirty subtries to process.
2535        if taken.is_empty() && self.upper_arena[self.root].is_cached() {
2536            return;
2537        }
2538
2539        // Walk the upper trie depth-first, restoring hashed subtries and inline-hashing
2540        // any remaining dirty subtries. Only descend into dirty branches; clean subtrees
2541        // cannot contain dirty subtries since dirty state propagates upward.
2542        taken.sort_unstable_by_key(|(_, b)| Reverse(b.path));
2543
2544        self.buffers.cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2545
2546        loop {
2547            let result = self.buffers.cursor.next(&mut self.upper_arena, |_, child| match child {
2548                ArenaSparseNode::Branch(_) | ArenaSparseNode::Subtrie(_) => !child.is_cached(),
2549                ArenaSparseNode::TakenSubtrie => true,
2550                _ => false,
2551            });
2552
2553            match result {
2554                NextResult::Done => break,
2555                NextResult::Branch => continue,
2556                NextResult::NonBranch => {}
2557            }
2558
2559            // Head is a subtrie or taken-subtrie — process it.
2560            let head_idx = self.buffers.cursor.head().expect("cursor is non-empty").index;
2561
2562            if matches!(&self.upper_arena[head_idx], ArenaSparseNode::TakenSubtrie) {
2563                let (_, subtrie) = taken.pop().expect("taken subtries must not be exhausted");
2564                debug_assert_eq!(
2565                    subtrie.path,
2566                    self.buffers.cursor.head().expect("cursor is non-empty").path,
2567                    "taken subtrie path mismatch",
2568                );
2569                self.upper_arena[head_idx] = ArenaSparseNode::Subtrie(subtrie);
2570            }
2571
2572            self.update_upper_subtrie(head_idx);
2573        }
2574    }
2575
2576    fn get_leaf_value(&self, full_path: &Nibbles) -> Option<&Vec<u8>> {
2577        Self::get_leaf_value_in_arena(&self.upper_arena, self.root, full_path, 0)
2578    }
2579
2580    fn find_leaf(
2581        &self,
2582        full_path: &Nibbles,
2583        expected_value: Option<&Vec<u8>>,
2584    ) -> Result<LeafLookup, LeafLookupError> {
2585        Self::find_leaf_in_arena(&self.upper_arena, self.root, full_path, 0, expected_value)
2586    }
2587
2588    fn updates_ref(&self) -> Cow<'_, SparseTrieUpdates> {
2589        self.updates.as_ref().map_or(Cow::Owned(SparseTrieUpdates::default()), Cow::Borrowed)
2590    }
2591
2592    fn take_updates(&mut self) -> SparseTrieUpdates {
2593        match self.updates.take() {
2594            Some(updates) => {
2595                self.updates = Some(SparseTrieUpdates::with_capacity(
2596                    updates.updated_nodes.len(),
2597                    updates.removed_nodes.len(),
2598                ));
2599                updates
2600            }
2601            None => SparseTrieUpdates::default(),
2602        }
2603    }
2604
2605    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2606    fn wipe(&mut self) {
2607        trace!(target: TRACE_TARGET, "Wiping arena trie");
2608        self.clear();
2609        self.updates = self.updates.is_some().then(SparseTrieUpdates::wiped);
2610    }
2611
2612    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2613    fn clear(&mut self) {
2614        #[cfg(feature = "trie-debug")]
2615        self.debug_recorder.reset();
2616
2617        self.upper_arena = SlotMap::new();
2618        self.root = self.upper_arena.insert(ArenaSparseNode::EmptyRoot);
2619        if let Some(updates) = self.updates.as_mut() {
2620            updates.clear()
2621        }
2622        self.buffers.clear();
2623    }
2624
2625    fn shrink_nodes_to(&mut self, _size: usize) {
2626        // Subtries are recreated from scratch on each block, so there is nothing to shrink.
2627    }
2628
2629    fn shrink_values_to(&mut self, _size: usize) {
2630        // Values are stored inline in nodes; no separate value storage.
2631    }
2632
2633    fn size_hint(&self) -> usize {
2634        self.upper_arena
2635            .iter()
2636            .map(|(_, node)| match node {
2637                ArenaSparseNode::Subtrie(s) => s.num_leaves as usize,
2638                ArenaSparseNode::Leaf { .. } => 1,
2639                _ => 0,
2640            })
2641            .sum()
2642    }
2643
2644    fn memory_size(&self) -> usize {
2645        let slot_size = slotmap_slot_size::<ArenaSparseNode>();
2646
2647        // Upper arena: capacity × slot size.
2648        let upper = self.upper_arena.capacity() * slot_size;
2649
2650        // Active subtries: cached total memory from last prune.
2651        let subtrie_size: usize = self
2652            .upper_arena
2653            .iter()
2654            .filter_map(|(_, node)| match node {
2655                ArenaSparseNode::Subtrie(s) => Some(s.memory_size()),
2656                _ => None,
2657            })
2658            .sum();
2659
2660        // RLP buffers.
2661        let buffer_size = self.buffers.rlp_buf.capacity() +
2662            self.buffers.rlp_node_buf.capacity() * core::mem::size_of::<RlpNode>();
2663
2664        upper + subtrie_size + buffer_size
2665    }
2666
2667    #[instrument(
2668        level = "trace",
2669        target = TRACE_TARGET,
2670        skip_all,
2671        fields(num_retained_leaves = retained_leaves.len()),
2672    )]
2673    fn prune(&mut self, retained_leaves: &[Nibbles]) -> usize {
2674        // Only descend if the root is a branch; otherwise there are no subtries.
2675        if !matches!(&self.upper_arena[self.root], ArenaSparseNode::Branch(_)) {
2676            return 0;
2677        }
2678
2679        let mut retained_leaves = retained_leaves.to_vec();
2680        retained_leaves.sort_unstable();
2681
2682        let threshold = self.parallelism_thresholds.min_leaves_for_prune;
2683
2684        let mut cursor = mem::take(&mut self.buffers.cursor);
2685        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2686
2687        // Subtries taken for parallel pruning: (arena_index, subtrie, retained_range).
2688        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>, core::ops::Range<usize>)> = Vec::new();
2689
2690        let mut pruned = 0;
2691        let mut retained_idx = 0;
2692
2693        loop {
2694            let result = cursor.next(&mut self.upper_arena, |_, child| {
2695                matches!(
2696                    child,
2697                    ArenaSparseNode::Branch(_) |
2698                        ArenaSparseNode::Subtrie(_) |
2699                        ArenaSparseNode::Leaf { .. }
2700                )
2701            });
2702
2703            match result {
2704                NextResult::Done => break,
2705                NextResult::NonBranch | NextResult::Branch => {}
2706            }
2707
2708            let head = cursor.head().expect("cursor is non-empty");
2709            let head_idx = head.index;
2710            let head_path = head.path;
2711
2712            match &self.upper_arena[head_idx] {
2713                ArenaSparseNode::Branch(_) | ArenaSparseNode::Leaf { .. } => {
2714                    // Don't prune the root.
2715                    if cursor.depth() == 0 {
2716                        continue;
2717                    }
2718
2719                    let short_key =
2720                        self.upper_arena[head_idx].short_key().expect("must be branch or leaf");
2721                    let mut node_prefix = head_path;
2722                    node_prefix.extend(short_key);
2723
2724                    let range = prefix_range(&retained_leaves, 0, &node_prefix);
2725                    if !range.is_empty() {
2726                        continue;
2727                    }
2728
2729                    // Pruning requires replacing the parent edge with a blinded hash. If this
2730                    // node has no cached RLP yet (dirty), skip pruning it in this pass.
2731                    let Some(cached_rlp) = self.upper_arena[head_idx]
2732                        .state_ref()
2733                        .and_then(|state| state.cached_rlp_node())
2734                        .cloned()
2735                    else {
2736                        continue;
2737                    };
2738
2739                    Self::remove_pruned_node(
2740                        &mut self.upper_arena,
2741                        &cursor,
2742                        head_idx,
2743                        head_path.last(),
2744                        cached_rlp,
2745                    );
2746                    pruned += 1;
2747                }
2748                ArenaSparseNode::Subtrie(_) => {
2749                    let subtrie_range = prefix_range(&retained_leaves, retained_idx, &head_path);
2750                    retained_idx = subtrie_range.end;
2751
2752                    if subtrie_range.is_empty() {
2753                        // A subtrie can only be removed if its root has a cached RLP to install
2754                        // as a blinded child in the parent branch.
2755                        let Some(cached_rlp) = self.upper_arena[head_idx]
2756                            .state_ref()
2757                            .and_then(|state| state.cached_rlp_node())
2758                            .cloned()
2759                        else {
2760                            continue;
2761                        };
2762
2763                        let removed = Self::remove_pruned_node(
2764                            &mut self.upper_arena,
2765                            &cursor,
2766                            head_idx,
2767                            head_path.last(),
2768                            cached_rlp,
2769                        );
2770                        let ArenaSparseNode::Subtrie(s) = &removed else { unreachable!() };
2771                        pruned += s.num_leaves as usize;
2772                        self.recycle_subtrie(removed);
2773                        continue;
2774                    }
2775
2776                    let ArenaSparseNode::Subtrie(subtrie) = &self.upper_arena[head_idx] else {
2777                        unreachable!()
2778                    };
2779                    if subtrie.num_leaves >= threshold {
2780                        let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
2781                            &mut self.upper_arena[head_idx],
2782                            ArenaSparseNode::TakenSubtrie,
2783                        ) else {
2784                            unreachable!()
2785                        };
2786                        taken.push((head_idx, subtrie, subtrie_range));
2787                    } else {
2788                        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[head_idx]
2789                        else {
2790                            unreachable!()
2791                        };
2792                        pruned += subtrie.prune(&retained_leaves[subtrie_range]);
2793                    }
2794                }
2795                _ => unreachable!("NonBranch in prune walk must be Subtrie, Leaf, or Branch"),
2796            }
2797        }
2798
2799        self.buffers.cursor = cursor;
2800
2801        if !taken.is_empty() {
2802            // Prune taken subtries, in parallel if more than one.
2803            if taken.len() == 1 {
2804                let (_, ref mut subtrie, ref range) = taken[0];
2805                pruned += subtrie.prune(&retained_leaves[range.clone()]);
2806            } else {
2807                use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
2808
2809                pruned += taken
2810                    .par_iter_mut()
2811                    .map(|(_, subtrie, range)| subtrie.prune(&retained_leaves[range.clone()]))
2812                    .sum::<usize>();
2813            }
2814
2815            // Restore taken subtries into the upper arena.
2816            for (child_idx, subtrie, _) in taken {
2817                self.upper_arena[child_idx] = ArenaSparseNode::Subtrie(subtrie);
2818            }
2819        }
2820
2821        if pruned > 0 {
2822            compact_arena(&mut self.upper_arena, &mut self.root);
2823        }
2824
2825        #[cfg(feature = "trie-debug")]
2826        self.record_initial_state();
2827
2828        pruned
2829    }
2830
2831    #[instrument(
2832        level = "trace",
2833        target = TRACE_TARGET,
2834        skip_all,
2835        fields(num_updates = updates.len()),
2836    )]
2837    fn update_leaves(
2838        &mut self,
2839        updates: &mut B256Map<LeafUpdate>,
2840        mut proof_required_fn: impl FnMut(B256, u8),
2841    ) -> SparseTrieResult<()> {
2842        if updates.is_empty() {
2843            return Ok(());
2844        }
2845
2846        #[cfg(feature = "trie-debug")]
2847        let recorded_updates: Vec<_> =
2848            updates.iter().map(|(k, v)| (*k, LeafUpdateRecord::from(v))).collect();
2849        #[cfg(feature = "trie-debug")]
2850        let mut recorded_proof_targets: Vec<(B256, u8)> = Vec::new();
2851
2852        // Drain and sort updates lexicographically by nibbles path.
2853        let mut sorted: Vec<_> =
2854            updates.drain().map(|(key, update)| (key, Nibbles::unpack(key), update)).collect();
2855        sorted.sort_unstable_by_key(|entry| entry.1);
2856
2857        let threshold = self.parallelism_thresholds.min_updates;
2858
2859        let mut cursor = mem::take(&mut self.buffers.cursor);
2860        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2861
2862        // Subtries taken for parallel processing: (arena_index, subtrie, update_range).
2863        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>, core::ops::Range<usize>)> = Vec::new();
2864
2865        let mut update_idx = 0;
2866        while update_idx < sorted.len() {
2867            let (key, ref full_path, ref update) = sorted[update_idx];
2868
2869            let find_result = cursor.seek(&mut self.upper_arena, full_path);
2870
2871            match find_result {
2872                // Blinded — request a proof regardless of update type.
2873                SeekResult::Blinded => {
2874                    let logical_len = cursor.head_logical_branch_path_len(&self.upper_arena);
2875                    let min_len = (logical_len as u8 + 1).min(64);
2876                    trace!(target: TRACE_TARGET, ?key, min_len, "Update hit blinded node, requesting proof");
2877                    proof_required_fn(key, min_len);
2878                    #[cfg(feature = "trie-debug")]
2879                    recorded_proof_targets.push((key, min_len));
2880                    updates.insert(key, update.clone());
2881                }
2882                // Subtrie — forward all consecutive updates under this subtrie's prefix.
2883                SeekResult::RevealedSubtrie => {
2884                    let subtrie_entry = cursor.head().expect("cursor is non-empty");
2885                    let child_idx = subtrie_entry.index;
2886                    let subtrie_root_path = subtrie_entry.path;
2887
2888                    let subtrie_start = update_idx;
2889                    while update_idx < sorted.len() &&
2890                        sorted[update_idx].1.starts_with(&subtrie_root_path)
2891                    {
2892                        update_idx += 1;
2893                    }
2894
2895                    let subtrie_updates = &sorted[subtrie_start..update_idx];
2896
2897                    // Edge-case: if all updates are removals that could empty the
2898                    // subtrie and collapse the parent onto a blinded sibling, request
2899                    // a proof for the sibling and skip the subtrie's updates.
2900                    if let Some(proof) = Self::check_subtrie_collapse_needs_proof(
2901                        &self.upper_arena,
2902                        &cursor,
2903                        subtrie_updates,
2904                    ) {
2905                        trace!(target: TRACE_TARGET, proof_key = ?proof.key, proof_min_len = proof.min_len, "Subtrie collapse would need blinded sibling, requesting proof");
2906                        proof_required_fn(proof.key, proof.min_len);
2907                        #[cfg(feature = "trie-debug")]
2908                        recorded_proof_targets.push((proof.key, proof.min_len));
2909                        for &(key, _, ref update) in subtrie_updates {
2910                            updates.insert(key, update.clone());
2911                        }
2912                        // Pop the subtrie entry before continuing.
2913                        continue;
2914                    }
2915
2916                    let num_subtrie_updates = update_idx - subtrie_start;
2917
2918                    // If all updates are removals and could empty the subtrie,
2919                    // force inline processing so the upper-arena collapse logic
2920                    // can detect blinded siblings and request proofs.
2921                    let all_removals = subtrie_updates
2922                        .iter()
2923                        // Filter out Touched, as they don't affect the structure of the trie. So an
2924                        // update set with 2 removals and one Touched could still result in an empty
2925                        // sub trie.
2926                        .filter(|(_, _, u)| matches!(u, LeafUpdate::Changed(_)))
2927                        .all(|(_, _, u)| matches!(u, LeafUpdate::Changed(v) if v.is_empty()));
2928                    let subtrie_num_leaves = match &self.upper_arena[child_idx] {
2929                        ArenaSparseNode::Subtrie(s) => s.num_leaves,
2930                        _ => 0,
2931                    };
2932                    let might_empty_subtrie =
2933                        all_removals && num_subtrie_updates as u64 >= subtrie_num_leaves;
2934
2935                    if num_subtrie_updates >= threshold && !might_empty_subtrie {
2936                        // Take subtrie for parallel update.
2937                        trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Taking subtrie for parallel update");
2938                        let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
2939                            &mut self.upper_arena[child_idx],
2940                            ArenaSparseNode::TakenSubtrie,
2941                        ) else {
2942                            unreachable!()
2943                        };
2944                        taken.push((child_idx, subtrie, subtrie_start..update_idx));
2945                    } else {
2946                        // Update inline.
2947                        trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Updating subtrie inline");
2948                        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[child_idx]
2949                        else {
2950                            unreachable!()
2951                        };
2952
2953                        subtrie.update_leaves(subtrie_updates);
2954
2955                        for (target_idx, proof) in subtrie.required_proofs.drain(..) {
2956                            proof_required_fn(proof.key, proof.min_len);
2957                            #[cfg(feature = "trie-debug")]
2958                            recorded_proof_targets.push((proof.key, proof.min_len));
2959                            let (key, _, ref update) = subtrie_updates[target_idx];
2960                            updates.insert(key, update.clone());
2961                        }
2962
2963                        // Check if the subtrie's root became empty after updates.
2964                        self.maybe_unwrap_subtrie(&mut cursor);
2965                    }
2966
2967                    // Don't increment update_idx — already advanced past subtrie updates.
2968                    continue;
2969                }
2970                // EmptyRoot, leaf, diverged branch, or empty child slot — upsert directly.
2971                find_result @ (SeekResult::EmptyRoot |
2972                SeekResult::RevealedLeaf |
2973                SeekResult::Diverged |
2974                SeekResult::NoChild { .. }) => match update {
2975                    LeafUpdate::Changed(v) if !v.is_empty() => {
2976                        let (result, _deltas) = Self::upsert_leaf(
2977                            &mut self.upper_arena,
2978                            &mut cursor,
2979                            &mut self.root,
2980                            full_path,
2981                            v,
2982                            find_result,
2983                        );
2984                        match result {
2985                            UpsertLeafResult::NewChild => {
2986                                let head = cursor.head().expect("cursor is non-empty");
2987                                if Self::should_be_subtrie(head.path.len()) {
2988                                    // The new child itself sits at the subtrie
2989                                    // boundary — wrap it directly.
2990                                    self.maybe_wrap_in_subtrie(head.index, &head.path);
2991                                } else {
2992                                    // The new child is above the boundary (e.g. a
2993                                    // split at depth 1 creates children at depth 2).
2994                                    // Wrap any of its children that land there.
2995                                    self.maybe_wrap_branch_children(&cursor);
2996                                }
2997                            }
2998                            UpsertLeafResult::NewLeaf => {
2999                                // A root-level split may create children at the
3000                                // subtrie boundary depth. Wrap them.
3001                                self.maybe_wrap_branch_children(&cursor);
3002                            }
3003                            UpsertLeafResult::Updated => {}
3004                        }
3005                    }
3006                    LeafUpdate::Changed(_) => {
3007                        let (result, _deltas) = Self::remove_leaf(
3008                            &mut self.upper_arena,
3009                            &mut cursor,
3010                            &mut self.root,
3011                            key,
3012                            full_path,
3013                            find_result,
3014                            &mut self.buffers.updates,
3015                        );
3016                        match result {
3017                            RemoveLeafResult::NeedsProof { key, proof_key, min_len } => {
3018                                proof_required_fn(proof_key, min_len);
3019                                #[cfg(feature = "trie-debug")]
3020                                recorded_proof_targets.push((proof_key, min_len));
3021                                let update =
3022                                    mem::replace(&mut sorted[update_idx].2, LeafUpdate::Touched);
3023                                updates.insert(key, update);
3024                            }
3025                            RemoveLeafResult::Removed => {
3026                                // remove_leaf may have called collapse_branch, which
3027                                // can leave structural invariants violated:
3028                                // 1. A branch with 0-1 children that needs further collapse or
3029                                //    removal.
3030                                // 2. A Subtrie at a depth shallower than UPPER_TRIE_MAX_DEPTH that
3031                                //    needs unwrapping.
3032                                // 3. A non-Subtrie node at UPPER_TRIE_MAX_DEPTH that needs
3033                                //    wrapping.
3034                                self.maybe_collapse_or_remove_branch(&mut cursor);
3035                                let head =
3036                                    cursor.head().expect("cursor always has root after collapse");
3037                                self.maybe_wrap_in_subtrie(head.index, &head.path);
3038                            }
3039                            RemoveLeafResult::NotFound => {}
3040                        }
3041                    }
3042                    LeafUpdate::Touched => {}
3043                },
3044            }
3045
3046            update_idx += 1;
3047        }
3048
3049        // Drain remaining cursor entries from the upper-trie walk.
3050        cursor.drain(&mut self.upper_arena);
3051        self.buffers.cursor = cursor;
3052
3053        if taken.is_empty() {
3054            #[cfg(debug_assertions)]
3055            self.debug_assert_subtrie_structure();
3056
3057            #[cfg(feature = "trie-debug")]
3058            self.debug_recorder.record(RecordedOp::UpdateLeaves {
3059                updates: recorded_updates,
3060                proof_targets: recorded_proof_targets,
3061            });
3062
3063            return Ok(());
3064        }
3065
3066        // Apply updates to taken subtries, in parallel if more than one.
3067        if taken.len() == 1 {
3068            let (_, ref mut subtrie, ref range) = taken[0];
3069            subtrie.update_leaves(&sorted[range.clone()]);
3070        } else {
3071            use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
3072
3073            taken.par_iter_mut().for_each(|(_, subtrie, range)| {
3074                subtrie.update_leaves(&sorted[range.clone()]);
3075            });
3076        }
3077
3078        // Collect subtrie paths before consuming `taken`, then restore subtries and
3079        // process required proofs.
3080        let taken_paths: Vec<Nibbles> = taken.iter().map(|(_, s, _)| s.path).collect();
3081        for (child_idx, mut subtrie, range) in taken {
3082            let subtrie_updates = &sorted[range];
3083            for (target_idx, proof) in subtrie.required_proofs.drain(..) {
3084                proof_required_fn(proof.key, proof.min_len);
3085                #[cfg(feature = "trie-debug")]
3086                recorded_proof_targets.push((proof.key, proof.min_len));
3087                let (key, _, ref update) = subtrie_updates[target_idx];
3088                updates.insert(key, update.clone());
3089            }
3090
3091            // Restore the subtrie into the upper arena.
3092            self.upper_arena[child_idx] = ArenaSparseNode::Subtrie(subtrie);
3093        }
3094
3095        // Navigate to each taken subtrie via seek to propagate dirty state
3096        // through intermediate branches. Taken subtries are guaranteed not to
3097        // become EmptyRoot (the would-empty-subtrie check above forces those
3098        // inline), so we only need to handle sibling collapses that may have
3099        // occurred during inline processing while this subtrie was taken.
3100        {
3101            let mut cursor = mem::take(&mut self.buffers.cursor);
3102            cursor.reset(&self.upper_arena, self.root, Nibbles::default());
3103
3104            for path in &taken_paths {
3105                let find_result = cursor.seek(&mut self.upper_arena, path);
3106                match find_result {
3107                    SeekResult::RevealedSubtrie => {
3108                        debug_assert!(
3109                            {
3110                                let head_idx = cursor.head().expect("cursor is non-empty").index;
3111                                !matches!(
3112                                    &self.upper_arena[head_idx],
3113                                    ArenaSparseNode::Subtrie(s) if matches!(s.arena[s.root], ArenaSparseNode::EmptyRoot)
3114                                )
3115                            },
3116                            "taken subtrie became EmptyRoot — should have been forced inline"
3117                        );
3118
3119                        cursor.pop(&mut self.upper_arena);
3120
3121                        // The parent branch (now at cursor top) may have had a sibling
3122                        // removed during inline processing while this subtrie was taken.
3123                        // Handle any necessary collapse or removal.
3124                        self.maybe_collapse_or_remove_branch(&mut cursor);
3125                    }
3126                    _ => {
3127                        // Subtrie was already unwrapped by a prior collapse; dirty state
3128                        // was propagated during that collapse. Nothing to do.
3129                    }
3130                }
3131            }
3132
3133            cursor.drain(&mut self.upper_arena);
3134            self.buffers.cursor = cursor;
3135        }
3136
3137        #[cfg(debug_assertions)]
3138        self.debug_assert_subtrie_structure();
3139
3140        #[cfg(feature = "trie-debug")]
3141        self.debug_recorder.record(RecordedOp::UpdateLeaves {
3142            updates: recorded_updates,
3143            proof_targets: recorded_proof_targets,
3144        });
3145
3146        Ok(())
3147    }
3148
3149    #[cfg(feature = "trie-debug")]
3150    fn take_debug_recorder(&mut self) -> TrieDebugRecorder {
3151        core::mem::take(&mut self.debug_recorder)
3152    }
3153
3154    fn commit_updates(
3155        &mut self,
3156        _updated: &HashMap<Nibbles, BranchNodeCompact>,
3157        _removed: &HashSet<Nibbles>,
3158    ) {
3159        // no-op for arena trie
3160    }
3161}
3162
3163#[cfg(test)]
3164mod tests {
3165    use super::TRACE_TARGET;
3166    use crate::{ArenaParallelSparseTrie, ArenaParallelismThresholds, LeafUpdate, SparseTrie};
3167    use alloy_primitives::{map::B256Map, B256, U256};
3168    use rand::{seq::SliceRandom, Rng, SeedableRng};
3169    use reth_trie::test_utils::TrieTestHarness;
3170    use reth_trie_common::{Nibbles, ProofV2Target};
3171    use std::collections::BTreeMap;
3172    use tracing::{info, trace};
3173
3174    /// Test harness for proptest-based arena sparse trie testing.
3175    ///
3176    /// Wraps [`TrieTestHarness`] and adds `ArenaParallelSparseTrie`-specific helpers for
3177    /// the reveal-update loop and asserting that sparse trie updates match `StorageRoot`.
3178    struct ArenaTrieTestHarness {
3179        /// The inner general-purpose harness.
3180        inner: TrieTestHarness,
3181    }
3182
3183    impl std::ops::Deref for ArenaTrieTestHarness {
3184        type Target = TrieTestHarness;
3185        fn deref(&self) -> &Self::Target {
3186            &self.inner
3187        }
3188    }
3189
3190    impl std::ops::DerefMut for ArenaTrieTestHarness {
3191        fn deref_mut(&mut self) -> &mut Self::Target {
3192            &mut self.inner
3193        }
3194    }
3195
3196    impl ArenaTrieTestHarness {
3197        /// Creates a new test harness from a map of hashed storage slots to values.
3198        fn new(storage: BTreeMap<B256, U256>) -> Self {
3199            Self { inner: TrieTestHarness::new(storage) }
3200        }
3201
3202        /// Computes the new storage root and trie updates after applying the given changes
3203        /// using both `StorageRoot` and the provided `ArenaParallelSparseTrie`, then asserts
3204        /// they match.
3205        fn assert_changes(
3206            &self,
3207            apst: &mut ArenaParallelSparseTrie,
3208            changes: BTreeMap<B256, U256>,
3209        ) {
3210            // Compute expected root and trie updates via StorageRoot.
3211            let (expected_root, mut expected_trie_updates) = if changes.is_empty() {
3212                (self.original_root(), Default::default())
3213            } else {
3214                self.get_root_with_updates(&changes)
3215            };
3216
3217            self.minimize_trie_updates(&mut expected_trie_updates);
3218
3219            // Build leaf updates for the APST: non-zero values are upserts (RLP-encoded),
3220            // zero values are deletions (empty vec).
3221            let mut leaf_updates: B256Map<LeafUpdate> = changes
3222                .iter()
3223                .map(|(&slot, &value)| {
3224                    let rlp_value = if value == U256::ZERO {
3225                        Vec::new()
3226                    } else {
3227                        alloy_rlp::encode_fixed_size(&value).to_vec()
3228                    };
3229                    (slot, LeafUpdate::Changed(rlp_value))
3230                })
3231                .collect();
3232
3233            // Reveal-update loop: call update_leaves, collect required proofs, fetch them,
3234            // reveal, and repeat until no more proofs are needed.
3235            loop {
3236                let mut targets: Vec<ProofV2Target> = Vec::new();
3237                apst.update_leaves(&mut leaf_updates, |key, min_len| {
3238                    targets.push(ProofV2Target::new(key).with_min_len(min_len));
3239                })
3240                .expect("update_leaves should succeed");
3241
3242                if targets.is_empty() {
3243                    break;
3244                }
3245
3246                let (mut proof_nodes, _) = self.proof_v2(&mut targets);
3247                apst.reveal_nodes(&mut proof_nodes).expect("reveal_nodes should succeed");
3248            }
3249
3250            // Compute root and take updates from the APST.
3251            let actual_root = apst.root();
3252            let mut actual_updates = apst.take_updates();
3253
3254            // Minimize sparse updates inline (can't use TrieTestHarness::minimize_sparse_updates
3255            // due to the crate's SparseTrieUpdates being a different type than reth-trie's copy).
3256            actual_updates.updated_nodes.retain(|path, node| {
3257                self.storage_trie_updates().storage_nodes.get(path) != Some(node)
3258            });
3259            actual_updates
3260                .removed_nodes
3261                .retain(|path| self.storage_trie_updates().storage_nodes.contains_key(path));
3262
3263            pretty_assertions::assert_eq!(
3264                expected_trie_updates.storage_nodes.into_iter().collect::<Vec<_>>().sort(),
3265                actual_updates.updated_nodes.into_iter().collect::<Vec<_>>().sort(),
3266                "updated nodes mismatch"
3267            );
3268            pretty_assertions::assert_eq!(
3269                expected_trie_updates.removed_nodes.into_iter().collect::<Vec<_>>().sort(),
3270                actual_updates.removed_nodes.into_iter().collect::<Vec<_>>().sort(),
3271                "removed nodes mismatch"
3272            );
3273            assert_eq!(expected_root, actual_root, "storage root mismatch");
3274        }
3275    }
3276
3277    use proptest::prelude::*;
3278    use proptest_arbitrary_interop::arb;
3279
3280    /// Builds a changeset by mixing `new_keys` (fresh insertions) with a fraction of
3281    /// existing keys from `base` (updates/deletions).
3282    ///
3283    /// `overlap_pct` controls how many existing keys are included, and `delete_pct`
3284    /// controls how many of those become deletions (zero values). The remaining
3285    /// overlap keys get random non-zero values.
3286    fn build_changeset(
3287        base: &BTreeMap<B256, U256>,
3288        new_keys: BTreeMap<B256, U256>,
3289        overlap_pct: f64,
3290        delete_pct: f64,
3291        rng: &mut rand::rngs::StdRng,
3292    ) -> BTreeMap<B256, U256> {
3293        let num_overlap = (base.len() as f64 * overlap_pct) as usize;
3294        let num_delete = (num_overlap as f64 * delete_pct) as usize;
3295
3296        let mut all_keys: Vec<B256> = base.keys().copied().collect();
3297        all_keys.shuffle(rng);
3298        let overlap_keys = &all_keys[..num_overlap];
3299
3300        let mut changeset = new_keys;
3301        for (i, &key) in overlap_keys.iter().enumerate() {
3302            let value =
3303                if i < num_delete { U256::ZERO } else { U256::from(rng.random::<u64>() | 1) };
3304            changeset.entry(key).or_insert(value);
3305        }
3306        changeset
3307    }
3308
3309    proptest! {
3310        #![proptest_config(ProptestConfig::with_cases(1000))]
3311        #[test]
3312        fn arena_trie_proptest(
3313            initial in proptest::collection::btree_map(arb::<B256>(), arb::<U256>(), 0..=100usize),
3314            changeset1_new_keys in proptest::collection::btree_map(arb::<B256>(), arb::<U256>(), 0..=30usize),
3315            changeset2_new_keys in proptest::collection::btree_map(arb::<B256>(), arb::<U256>(), 0..=30usize),
3316            overlap_pct in 0.0..=0.5f64,
3317            delete_pct in 0.0..=0.33f64, // percent of overlapping changeset which are deletes
3318            shuffle_seed in arb::<u64>(),
3319        ) {
3320            reth_tracing::init_test_tracing();
3321            info!(target: TRACE_TARGET, ?shuffle_seed, "PROPTEST START");
3322
3323            // Filter out zero-valued entries from the initial dataset (zeros mean "absent").
3324            let initial: BTreeMap<B256, U256> = initial.into_iter()
3325                .filter(|(_, v)| *v != U256::ZERO)
3326                .collect();
3327
3328            let mut rng = rand::rngs::StdRng::seed_from_u64(shuffle_seed);
3329
3330            let changeset1 = build_changeset(&initial, changeset1_new_keys, overlap_pct, delete_pct, &mut rng);
3331            for (i, (k, v)) in changeset1.iter().enumerate() {
3332                trace!(target: TRACE_TARGET, ?i, ?k, ?v, "Changeset 1 entry");
3333            }
3334
3335            let mut harness = ArenaTrieTestHarness::new(initial);
3336
3337            // Initialize the APST from the harness root node.
3338            let root_node = harness.root_node();
3339            let mut apst = ArenaParallelSparseTrie::default().with_parallelism_thresholds(
3340                ArenaParallelismThresholds {
3341                    min_dirty_leaves: 3,
3342                    min_revealed_nodes: 3,
3343                    min_updates: 3,
3344                    min_leaves_for_prune: 3,
3345                },
3346            );
3347            apst.set_root(root_node.node, root_node.masks, true).expect("set_root should succeed");
3348
3349            harness.assert_changes(&mut apst, changeset1.clone());
3350
3351            // Update the harness base dataset to reflect the first changeset.
3352            harness.apply_changeset(changeset1);
3353
3354            // Pick N random keys from the current storage as retained leaves for pruning.
3355            let mut all_storage_keys: Vec<Nibbles> = harness.storage().keys()
3356                .map(|k| Nibbles::unpack(*k))
3357                .collect();
3358            all_storage_keys.shuffle(&mut rng);
3359            let num_retain = if all_storage_keys.is_empty() { 0 } else {
3360                rng.random_range(0..=all_storage_keys.len())
3361            };
3362            let retained: Vec<Nibbles> = all_storage_keys[..num_retain].to_vec();
3363            apst.prune(&retained);
3364
3365            let changeset2 = build_changeset(harness.storage(), changeset2_new_keys, overlap_pct, delete_pct, &mut rng);
3366            for (i, (k, v)) in changeset2.iter().enumerate() {
3367                trace!(target: TRACE_TARGET, ?i, ?k, ?v, "Changeset 2 entry");
3368            }
3369
3370            harness.assert_changes(&mut apst, changeset2);
3371        }
3372    }
3373}