Skip to main content

reth_trie_sparse/arena/
mod.rs

1mod branch_child_idx;
2mod cursor;
3mod nodes;
4
5use branch_child_idx::{BranchChildIdx, BranchChildIter};
6use cursor::{ArenaCursor, NextResult, SeekResult};
7use nodes::{
8    ArenaSparseNode, ArenaSparseNodeBranch, ArenaSparseNodeBranchChild, ArenaSparseNodeState,
9};
10
11use crate::{LeafLookup, LeafLookupError, LeafUpdate, SparseTrie, SparseTrieUpdates};
12use alloc::{borrow::Cow, boxed::Box, collections::VecDeque, vec::Vec};
13use alloy_primitives::{keccak256, map::B256Map, B256};
14use alloy_trie::TrieMask;
15use core::{cmp::Reverse, mem};
16use reth_execution_errors::SparseTrieResult;
17use reth_trie_common::{
18    BranchNodeMasks, BranchNodeRef, ExtensionNodeRef, LeafNodeRef, Nibbles, ProofTrieNodeV2,
19    RlpNode, TrieNodeV2, EMPTY_ROOT_HASH,
20};
21use slotmap::{DefaultKey, SlotMap};
22use smallvec::SmallVec;
23use tracing::{instrument, trace};
24
25#[cfg(feature = "trie-debug")]
26use crate::debug_recorder::{LeafUpdateRecord, ProofTrieNodeRecord, RecordedOp, TrieDebugRecorder};
27
28/// Alias for the slotmap key type used as node references throughout the arena trie.
29type Index = DefaultKey;
30/// Alias for the slotmap used as the node arena throughout the arena trie.
31type NodeArena = SlotMap<Index, ArenaSparseNode>;
32
33const TRACE_TARGET: &str = "trie::arena";
34
35/// The maximum path length (in nibbles) for nodes that live in the upper trie. Nodes at this
36/// depth or deeper belong to lower subtries.
37const UPPER_TRIE_MAX_DEPTH: usize = 2;
38
39/// Finds the sub-range of `sorted_keys[start..]` whose entries start with `prefix`.
40///
41/// Returns the half-open range `start_idx..end_idx` into `sorted_keys`. The returned
42/// `end_idx` can be used as the `start` for the next call when iterating prefixes in
43/// lexicographic order.
44fn prefix_range(
45    sorted_keys: &[Nibbles],
46    start: usize,
47    prefix: &Nibbles,
48) -> core::ops::Range<usize> {
49    // Advance past entries before `prefix`.
50    let begin = start + sorted_keys[start..].partition_point(|p| p < prefix);
51    // Find the end of entries that start with `prefix`.
52    let mut end = begin;
53    while end < sorted_keys.len() && sorted_keys[end].starts_with(prefix) {
54        end += 1;
55    }
56    begin..end
57}
58
59/// Returns the per-slot byte size used by `SlotMap<_, T>`. `SlotMap` wraps each value in a
60/// `Slot<T>` containing the value union + a 4-byte version field, with struct alignment.
61const fn slotmap_slot_size<T>() -> usize {
62    // Slot<T> = { u: SlotUnion<T>, version: u32 }
63    // SlotUnion<T> = union { value: ManuallyDrop<T>, next_free: u32 }
64    // size = max(size_of::<T>(), 4) + 4, rounded up to align_of::<T>() (or 4)
65    let union_size = if core::mem::size_of::<T>() > 4 { core::mem::size_of::<T>() } else { 4 };
66    let raw = union_size + 4;
67    let align = if core::mem::align_of::<T>() > 4 { core::mem::align_of::<T>() } else { 4 };
68    (raw + align - 1) & !(align - 1)
69}
70
71/// Compacts an arena by BFS-copying all reachable nodes into a fresh `SlotMap`, dropping
72/// unreachable (pruned) slots. Parents are stored before children for cache-friendly top-down
73/// traversal.
74fn compact_arena(arena: &mut NodeArena, root: &mut Index) {
75    let mut new_arena = SlotMap::with_capacity(arena.len());
76    let mut queue = VecDeque::new();
77
78    let root_node = arena.remove(*root).expect("root exists");
79    let new_root = new_arena.insert(root_node);
80    queue.push_back(new_root);
81
82    while let Some(new_idx) = queue.pop_front() {
83        // Invariant: any node popped from `queue` has been moved into `new_arena` but
84        // its Branch.children have not been rewritten yet — every Revealed(idx) here is
85        // still an old-arena index, and the child is still present in `arena` because
86        // only this parent's iteration can remove it (each child has exactly one parent).
87        let old_children: SmallVec<[(usize, Index); 16]> = match &new_arena[new_idx] {
88            ArenaSparseNode::Branch(b) => b
89                .children
90                .iter()
91                .enumerate()
92                .filter_map(|(i, c)| match c {
93                    ArenaSparseNodeBranchChild::Revealed(old_idx) => Some((i, *old_idx)),
94                    _ => None,
95                })
96                .collect(),
97            _ => continue,
98        };
99
100        for (child_pos, old_child_idx) in old_children {
101            let child_node = arena.remove(old_child_idx).expect("child exists");
102            let new_child_idx = new_arena.insert(child_node);
103            let ArenaSparseNode::Branch(b) = &mut new_arena[new_idx] else { unreachable!() };
104            b.children[child_pos] = ArenaSparseNodeBranchChild::Revealed(new_child_idx);
105            queue.push_back(new_child_idx);
106        }
107    }
108
109    debug_assert!(
110        arena.is_empty(),
111        "compact_arena: {} orphaned nodes remaining after BFS drain",
112        arena.len(),
113    );
114
115    *arena = new_arena;
116    *root = new_root;
117}
118
119/// Reusable buffers shared by both [`ArenaSparseSubtrie`] and [`ArenaParallelSparseTrie`].
120#[derive(Debug, Default, Clone)]
121struct ArenaTrieBuffers {
122    /// Reusable cursor for trie traversals.
123    cursor: ArenaCursor,
124    /// Trie updates built up directly during hashing and structural changes. `Some` when
125    /// tracking updates, `None` otherwise. Initialized alongside `updates` in `set_updates`.
126    updates: Option<SparseTrieUpdates>,
127    /// Reusable buffer for RLP encoding.
128    rlp_buf: Vec<u8>,
129    /// Reusable buffer for child `RlpNode`s during hashing.
130    rlp_node_buf: Vec<RlpNode>,
131}
132
133impl ArenaTrieBuffers {
134    fn clear(&mut self) {
135        if let Some(updates) = self.updates.as_mut() {
136            updates.clear();
137        }
138        self.rlp_buf.clear();
139        self.rlp_node_buf.clear();
140    }
141}
142
143/// A subtrie within the arena-based parallel sparse trie.
144///
145/// Each subtrie owns its own arena, allowing parallel mutations across subtries.
146#[derive(Debug, Clone)]
147struct ArenaSparseSubtrie {
148    /// The arena allocating nodes within this subtrie.
149    arena: NodeArena,
150    /// The root node of this subtrie.
151    root: Index,
152    /// The absolute path of this subtrie's root in the full trie.
153    path: Nibbles,
154    /// Reusable buffers for traversal, RLP encoding, and update actions.
155    buffers: ArenaTrieBuffers,
156    /// Reusable buffer for collecting required proofs during leaf updates.
157    /// Each entry is `(index, proof)` where `index` is the position of the target in the
158    /// `sorted_updates` slice passed to [`Self::update_leaves`].
159    required_proofs: Vec<(usize, ArenaRequiredProof)>,
160    /// Total number of revealed leaves in this subtrie.
161    num_leaves: u64,
162    /// Number of dirty (modified since last hash) leaves in this subtrie.
163    num_dirty_leaves: u64,
164    /// Cached total memory footprint of this subtrie in bytes. Computed during prune;
165    /// starts at 0 before the first prune.
166    cached_memory_size: usize,
167}
168
169impl ArenaSparseSubtrie {
170    /// Creates a new subtrie with a pre-allocated root slot containing
171    /// [`ArenaSparseNode::EmptyRoot`]. The caller must overwrite `subtrie.arena[subtrie.root]`
172    /// before use.
173    fn new(record_updates: bool) -> Box<Self> {
174        let mut arena = SlotMap::new();
175        let root = arena.insert(ArenaSparseNode::EmptyRoot);
176        let buffers = ArenaTrieBuffers {
177            updates: record_updates.then(SparseTrieUpdates::default),
178            ..Default::default()
179        };
180        Box::new(Self {
181            arena,
182            root,
183            path: Nibbles::default(),
184            buffers,
185            required_proofs: Vec::new(),
186            num_leaves: 0,
187            num_dirty_leaves: 0,
188            cached_memory_size: 0,
189        })
190    }
191
192    /// Returns the cached total memory footprint of this subtrie in bytes.
193    /// Accurate after prune; returns 0 before the first prune.
194    const fn memory_size(&self) -> usize {
195        self.cached_memory_size
196    }
197
198    /// Asserts that `num_leaves` and `num_dirty_leaves` match the actual counts in the arena.
199    #[cfg(debug_assertions)]
200    fn debug_assert_counters(&self) {
201        let (actual_leaves, actual_dirty) =
202            ArenaParallelSparseTrie::count_leaves_and_dirty(&self.arena, self.root);
203        debug_assert_eq!(
204            self.num_leaves, actual_leaves,
205            "subtrie {:?} num_leaves mismatch: stored {} vs actual {}",
206            self.path, self.num_leaves, actual_leaves,
207        );
208        debug_assert_eq!(
209            self.num_dirty_leaves, actual_dirty,
210            "subtrie {:?} num_dirty_leaves mismatch: stored {} vs actual {}",
211            self.path, self.num_dirty_leaves, actual_dirty,
212        );
213    }
214
215    /// Prunes revealed subtrees that are not ancestors of any retained leaf, compacting the
216    /// arena in a single BFS pass.
217    ///
218    /// `retained_leaves` must be sorted and scoped to this subtrie's key range. Builds a fresh
219    /// arena by BFS-copying only retained nodes from the root, blinding non-retained children
220    /// at the boundary. Non-retained subtrees are never visited — they are dropped with the
221    /// old arena.
222    ///
223    /// Expects that all nodes have computed hashes (i.e. `prune` is called after hashing).
224    fn prune(&mut self, retained_leaves: &[Nibbles]) -> usize {
225        // Only branches can have pruneable children.
226        if !matches!(&self.arena[self.root], ArenaSparseNode::Branch(_)) {
227            return 0;
228        }
229
230        debug_assert!(
231            retained_leaves.windows(2).all(|w| w[0] <= w[1]),
232            "retained_leaves must be sorted"
233        );
234        debug_assert_eq!(self.num_dirty_leaves, 0, "prune must run after hashing");
235
236        let old_count = self.arena.len();
237        // In a tree where every branch has ≥2 children, #branches ≤ #leaves − 1, so
238        // total nodes ≤ 2N − 1. This is a reasonable upper-bound capacity hint that
239        // avoids most reallocations without over-allocating when pruning is heavy.
240        let mut new_arena = SlotMap::with_capacity(retained_leaves.len() * 2);
241        // Queue: (new_idx, path TO the node — excluding its own short_key)
242        let mut queue: VecDeque<(Index, Nibbles)> = VecDeque::new();
243        let mut new_num_leaves = 0u64;
244        let mut new_nodes_heap_size = 0usize;
245
246        // Root is always retained.
247        let root_node = self.arena.remove(self.root).expect("root exists");
248        let new_root = new_arena.insert(root_node);
249        queue.push_back((new_root, self.path));
250
251        while let Some((new_idx, node_path)) = queue.pop_front() {
252            new_nodes_heap_size += new_arena[new_idx].extra_heap_bytes();
253
254            let ArenaSparseNode::Branch(b) = &new_arena[new_idx] else {
255                if matches!(&new_arena[new_idx], ArenaSparseNode::Leaf { .. }) {
256                    new_num_leaves += 1;
257                }
258                continue;
259            };
260
261            // Logical path of this branch (path TO node + its extension/short_key).
262            let mut branch_logical_path = node_path;
263            branch_logical_path.extend(&b.short_key);
264
265            // Collect (dense_pos, nibble, old_child_idx) for revealed children.
266            let children: SmallVec<[(usize, u8, Index); 16]> = BranchChildIter::new(b.state_mask)
267                .filter_map(|(dense_idx, nibble)| match &b.children[dense_idx] {
268                    ArenaSparseNodeBranchChild::Revealed(old_idx) => {
269                        Some((dense_idx.get(), nibble, *old_idx))
270                    }
271                    _ => None,
272                })
273                .collect();
274
275            for (child_pos, nibble, old_child_idx) in children {
276                // Child's path in the trie (edges to reach it, excluding its own short_key).
277                let mut child_path = branch_logical_path;
278                child_path.push(nibble);
279
280                // Child's full prefix for the retention check.
281                let child_short_key = match &self.arena[old_child_idx] {
282                    ArenaSparseNode::Branch(b) => &b.short_key,
283                    ArenaSparseNode::Leaf { key, .. } => key,
284                    other => unreachable!("subtrie prune: unexpected child node kind: {other:?}"),
285                };
286                let mut child_prefix = child_path;
287                child_prefix.extend(child_short_key);
288
289                if has_prefix(retained_leaves, &child_prefix) {
290                    // Retained — move child to new arena.
291                    let child_node = self.arena.remove(old_child_idx).expect("child exists");
292                    let new_child_idx = new_arena.insert(child_node);
293                    let ArenaSparseNode::Branch(b) = &mut new_arena[new_idx] else {
294                        unreachable!()
295                    };
296                    b.children[child_pos] = ArenaSparseNodeBranchChild::Revealed(new_child_idx);
297                    queue.push_back((new_child_idx, child_path));
298                } else {
299                    // Not retained — blind the child slot in the new arena.
300                    let rlp_node = self.arena[old_child_idx]
301                        .state_ref()
302                        .expect("child must have state")
303                        .cached_rlp_node()
304                        .cloned()
305                        .expect("pruned child must have cached RLP (prune runs after hashing)");
306                    let ArenaSparseNode::Branch(b) = &mut new_arena[new_idx] else {
307                        unreachable!()
308                    };
309                    b.children[child_pos] = ArenaSparseNodeBranchChild::Blinded(rlp_node);
310                }
311            }
312        }
313
314        let pruned = old_count - new_arena.len();
315        self.num_leaves = new_num_leaves;
316        self.num_dirty_leaves = 0;
317        self.arena = new_arena;
318        self.root = new_root;
319        self.cached_memory_size =
320            self.arena.capacity() * slotmap_slot_size::<ArenaSparseNode>() + new_nodes_heap_size;
321
322        #[cfg(debug_assertions)]
323        self.debug_assert_counters();
324        return pruned;
325
326        /// Returns `true` if any entry in `sorted_keys` starts with `prefix`.
327        fn has_prefix(sorted_keys: &[Nibbles], prefix: &Nibbles) -> bool {
328            let idx = sorted_keys.binary_search(prefix).unwrap_or_else(|i| i);
329            sorted_keys.get(idx).is_some_and(|p| p.starts_with(prefix))
330        }
331    }
332
333    /// Applies leaf updates within this subtrie. Uses the same walk-down-with-cursor pattern as
334    /// [`Self::reveal_nodes`], but checks accessibility for [`LeafUpdate::Touched`] entries.
335    ///
336    /// `sorted_updates` must be sorted lexicographically by their nibbles path (index 1).
337    ///
338    /// Any required proofs are appended to `self.required_proofs` and should be drained by the
339    /// caller after this method returns.
340    #[instrument(
341        level = "trace",
342        target = TRACE_TARGET,
343        skip_all,
344        fields(
345            subtrie = ?self.path,
346            num_updates = sorted_updates.len(),
347        ),
348    )]
349    fn update_leaves(&mut self, sorted_updates: &[(B256, Nibbles, LeafUpdate)]) {
350        if sorted_updates.is_empty() {
351            return;
352        }
353        trace!(target: TRACE_TARGET, "Subtrie update_leaves");
354
355        debug_assert!(
356            !matches!(self.arena[self.root], ArenaSparseNode::EmptyRoot),
357            "subtrie root must not be EmptyRoot at start of update_leaves"
358        );
359
360        self.buffers.cursor.reset(&self.arena, self.root, self.path);
361
362        for (idx, &(key, ref full_path, ref update)) in sorted_updates.iter().enumerate() {
363            let find_result = self.buffers.cursor.seek(&mut self.arena, full_path);
364
365            // If the path hits a blinded node, request a proof regardless of update type.
366            if matches!(find_result, SeekResult::Blinded) {
367                let logical_len = self.buffers.cursor.head_logical_branch_path_len(&self.arena);
368                self.required_proofs.push((
369                    idx,
370                    ArenaRequiredProof { key, min_len: (logical_len as u8 + 1).min(64) },
371                ));
372                continue;
373            }
374
375            match update {
376                LeafUpdate::Changed(value) if !value.is_empty() => {
377                    // Upsert: insert or update a leaf with the given value.
378                    let (_result, deltas) = ArenaParallelSparseTrie::upsert_leaf(
379                        &mut self.arena,
380                        &mut self.buffers.cursor,
381                        &mut self.root,
382                        full_path,
383                        value,
384                        find_result,
385                    );
386                    self.num_leaves = (self.num_leaves as i64 + deltas.num_leaves_delta) as u64;
387                    self.num_dirty_leaves =
388                        (self.num_dirty_leaves as i64 + deltas.num_dirty_leaves_delta) as u64;
389                }
390                LeafUpdate::Changed(_) => {
391                    let (result, deltas) = ArenaParallelSparseTrie::remove_leaf(
392                        &mut self.arena,
393                        &mut self.buffers.cursor,
394                        &mut self.root,
395                        key,
396                        full_path,
397                        find_result,
398                        &mut self.buffers.updates,
399                    );
400                    self.num_leaves = (self.num_leaves as i64 + deltas.num_leaves_delta) as u64;
401                    self.num_dirty_leaves =
402                        (self.num_dirty_leaves as i64 + deltas.num_dirty_leaves_delta) as u64;
403
404                    if let RemoveLeafResult::NeedsProof { key, proof_key, min_len } = result {
405                        self.required_proofs
406                            .push((idx, ArenaRequiredProof { key: proof_key, min_len }));
407                        self.required_proofs.push((idx, ArenaRequiredProof { key, min_len }));
408                    }
409                }
410                LeafUpdate::Touched => {}
411            }
412        }
413
414        // Drain remaining cursor entries, propagating dirty state.
415        self.buffers.cursor.drain(&mut self.arena);
416
417        #[cfg(debug_assertions)]
418        self.debug_assert_counters();
419    }
420
421    /// Reveals nodes inside this subtrie. Uses [`ArenaCursor::seek`] to locate the ancestor
422    /// node, then replaces blinded children with the proof nodes.
423    fn reveal_nodes(&mut self, nodes: &mut [ProofTrieNodeV2]) -> SparseTrieResult<()> {
424        if nodes.is_empty() {
425            return Ok(());
426        }
427        trace!(target: TRACE_TARGET, path = ?self.path, num_nodes = nodes.len(), "Subtrie reveal_nodes");
428
429        debug_assert!(
430            !matches!(self.arena[self.root], ArenaSparseNode::EmptyRoot),
431            "subtrie root must not be EmptyRoot in reveal_nodes"
432        );
433
434        self.buffers.cursor.reset(&self.arena, self.root, self.path);
435
436        for node in nodes.iter_mut() {
437            let find_result = self.buffers.cursor.seek(&mut self.arena, &node.path);
438            if ArenaParallelSparseTrie::reveal_node(
439                &mut self.arena,
440                &self.buffers.cursor,
441                node,
442                find_result,
443            )
444            .is_some_and(|child_idx| matches!(self.arena[child_idx], ArenaSparseNode::Leaf { .. }))
445            {
446                self.num_leaves += 1;
447            }
448        }
449
450        // Drain remaining cursor entries, propagating dirty state.
451        self.buffers.cursor.drain(&mut self.arena);
452
453        #[cfg(debug_assertions)]
454        self.debug_assert_counters();
455
456        Ok(())
457    }
458
459    /// Computes and caches `RlpNode` for all dirty nodes via iterative post-order DFS.
460    /// After this call every node reachable from `self.root` will be in `Cached` state.
461    ///
462    /// Trie updates are written directly to `self.buffers.updates` (if `Some`).
463    fn update_cached_rlp(&mut self) {
464        ArenaParallelSparseTrie::update_cached_rlp(
465            &mut self.arena,
466            self.root,
467            &mut self.buffers.cursor,
468            &mut self.buffers.rlp_buf,
469            &mut self.buffers.rlp_node_buf,
470            self.path,
471            &mut self.buffers.updates,
472        );
473        self.num_dirty_leaves = 0;
474        #[cfg(debug_assertions)]
475        self.debug_assert_counters();
476    }
477}
478
479/// Tracks the net change in leaf counters caused by a trie mutation (upsert or removal).
480/// Returned alongside [`UpsertLeafResult`] / [`RemoveLeafResult`] so the caller can maintain
481/// aggregate counters on [`ArenaSparseSubtrie`] without scanning the arena.
482#[derive(Debug, Default)]
483struct SubtrieCounterDeltas {
484    num_leaves_delta: i64,
485    num_dirty_leaves_delta: i64,
486}
487
488/// Result of `upsert_leaf` indicating whether a new child was created that the caller
489/// may need to wrap as a subtrie (in the upper trie).
490#[derive(Debug)]
491enum UpsertLeafResult {
492    /// A leaf was updated in place (no structural change).
493    Updated,
494    /// A new leaf was created (e.g. EmptyRoot→Leaf, or root-level split).
495    NewLeaf,
496    /// A new child (branch or leaf) was created or inserted. The child is the cursor head
497    /// and its parent is the cursor's parent.
498    NewChild,
499}
500
501/// Result of `remove_leaf` indicating whether a proof is needed to complete a branch
502/// collapse.
503#[derive(Debug)]
504enum RemoveLeafResult {
505    /// No proof needed — the removal (and any collapse) completed fully.
506    Removed,
507    /// No leaf was found at the given path (no-op).
508    NotFound,
509    /// The branch collapse requires revealing a blinded sibling. The caller must request a
510    /// proof for the given key at the given minimum depth.
511    NeedsProof { key: B256, proof_key: B256, min_len: u8 },
512}
513
514/// A proof request generated during leaf updates when a blinded node is encountered.
515#[derive(Debug, Clone)]
516struct ArenaRequiredProof {
517    /// The key requiring a proof.
518    key: B256,
519    /// Minimum depth at which proof nodes should be returned.
520    min_len: u8,
521}
522
523/// An arena-based parallel sparse trie.
524///
525/// Configuration for controlling when parallelism is enabled in [`ArenaParallelSparseTrie`]
526/// operations.
527#[derive(Debug, Clone, Copy, PartialEq, Eq)]
528pub struct ArenaParallelismThresholds {
529    /// Minimum number of dirty leaves in a subtrie before it is eligible for parallel hash
530    /// computation. Subtries with fewer dirty leaves than this are hashed serially during
531    /// [`ArenaParallelSparseTrie::update_subtrie_hashes`].
532    pub min_dirty_leaves: u64,
533    /// Minimum number of nodes to reveal in a subtrie before it is eligible for parallel
534    /// reveal. Subtries with fewer nodes to reveal than this are revealed inline during the
535    /// upper trie walk.
536    pub min_revealed_nodes: usize,
537    /// Minimum number of leaf updates targeting a subtrie before it is eligible for parallel
538    /// update. Subtries with fewer updates than this are updated inline during the upper trie
539    /// walk.
540    pub min_updates: usize,
541    /// Minimum number of revealed leaves in a subtrie before it is eligible for parallel
542    /// pruning. Subtries with fewer leaves than this are pruned inline during the upper trie
543    /// walk.
544    pub min_leaves_for_prune: u64,
545}
546
547impl Default for ArenaParallelismThresholds {
548    fn default() -> Self {
549        Self {
550            min_dirty_leaves: 64,
551            min_revealed_nodes: 16,
552            min_updates: 128,
553            min_leaves_for_prune: 128,
554        }
555    }
556}
557
558/// An arena-based sparse trie whose subtries can be mutated in parallel.
559///
560/// ## Structure
561///
562/// Uses arena allocation ([`slotmap::SlotMap`]) for node storage with direct index-based child
563/// pointers, avoiding the per-node hashing overhead of a `HashMap`-based trie. The trie is split
564/// into two tiers:
565///
566/// - **Upper trie** (`upper_arena`): Contains nodes whose path is shorter than
567///   `UPPER_TRIE_MAX_DEPTH` nibbles. These are the root and its immediate children.
568/// - **Lower subtries** (`ArenaSparseSubtrie`): Each child of an upper-trie branch at the depth
569///   boundary becomes the root of its own subtrie, stored as an `ArenaSparseNode::Subtrie` child in
570///   the upper arena. Each subtrie owns its own arena, enabling lock-free parallel mutation.
571///
572/// Node placement is determined by path length (not counting a branch's short key):
573///
574/// - Paths with **< `UPPER_TRIE_MAX_DEPTH`** nibbles live in `upper_arena`.
575/// - Paths with **≥ `UPPER_TRIE_MAX_DEPTH`** nibbles live in a subtrie.
576///
577/// ## Node Revealing
578///
579/// Nodes are lazily revealed from proof data via [`SparseTrie::reveal_nodes`]. Each node is
580/// placed into the upper arena or delegated to its subtrie based on path depth. Unrevealed
581/// children are stored as `ArenaSparseNodeBranchChild::Blinded` with their RLP encoding.
582/// When multiple subtries have pending reveals, they are processed in parallel using rayon
583/// (controlled by [`ArenaParallelismThresholds::min_revealed_nodes`]).
584///
585/// ## Leaf Operations
586///
587/// Leaf updates and removals are applied via [`SparseTrie::update_leaves`]. The method walks
588/// the upper trie to route each update to the correct subtrie, then processes subtries in
589/// parallel when the update count exceeds [`ArenaParallelismThresholds::min_updates`].
590///
591/// After updates, structural changes (branch collapse, subtrie unwrapping) are handled by
592/// propagating dirty state back up through the upper trie.
593///
594/// ## Root Hash Calculation
595///
596/// Root hash computation follows a bottom-up approach:
597///
598/// 1. **[`SparseTrie::update_subtrie_hashes`]**: Takes dirty subtries from the upper arena and
599///    hashes them in parallel (when dirty leaf count meets
600///    [`ArenaParallelismThresholds::min_dirty_leaves`]), then walks the upper trie to restore
601///    hashed subtries and inline-hash any remaining dirty nodes.
602/// 2. **[`SparseTrie::root`]**: Calls `update_subtrie_hashes`, then RLP-encodes the full upper trie
603///    depth-first to produce the root hash.
604///
605/// Each node tracks its state via `ArenaSparseNodeState` (`Revealed`, `Cached`, or `Dirty`)
606/// so only modified subtrees are recomputed.
607///
608/// ## Pruning
609///
610/// [`SparseTrie::prune`] removes revealed nodes that are not ancestors of any retained leaf.
611/// Pruned nodes are replaced with `ArenaSparseNodeBranchChild::Blinded` entries using their
612/// cached RLP. Subtries are pruned in parallel when their leaf count exceeds
613/// [`ArenaParallelismThresholds::min_leaves_for_prune`].
614#[derive(Debug, Clone)]
615pub struct ArenaParallelSparseTrie {
616    /// The arena allocating nodes in the upper trie.
617    upper_arena: NodeArena,
618    /// The root node of the upper trie.
619    root: Index,
620    /// Optional tracking of trie updates for database persistence.
621    updates: Option<SparseTrieUpdates>,
622    /// Reusable buffers for traversal, RLP encoding, and update actions.
623    buffers: ArenaTrieBuffers,
624    /// Thresholds controlling when parallelism is enabled for different operations.
625    parallelism_thresholds: ArenaParallelismThresholds,
626    /// Debug recorder for tracking mutating operations.
627    #[cfg(feature = "trie-debug")]
628    debug_recorder: TrieDebugRecorder,
629}
630
631impl ArenaParallelSparseTrie {
632    /// Sets the thresholds that control when parallelism is used during operations.
633    pub const fn with_parallelism_thresholds(
634        mut self,
635        thresholds: ArenaParallelismThresholds,
636    ) -> Self {
637        self.parallelism_thresholds = thresholds;
638        self
639    }
640
641    /// Resets the debug recorder and records the current trie state as `SetRoot` + `RevealNodes`
642    /// ops, representing the initial state at the beginning of a block (after pruning).
643    ///
644    /// Walks the upper arena and all subtries depth-first using the cursor, converting each
645    /// node into a [`crate::debug_recorder::ProofTrieNodeRecord`].
646    #[cfg(feature = "trie-debug")]
647    fn record_initial_state(&mut self) {
648        use crate::debug_recorder::{NodeStateRecord, TrieNodeRecord};
649        use alloy_primitives::hex;
650        use alloy_trie::nodes::{BranchNode, TrieNode};
651
652        fn state_to_record(state: &ArenaSparseNodeState) -> NodeStateRecord {
653            match state {
654                ArenaSparseNodeState::Revealed => NodeStateRecord::Revealed,
655                ArenaSparseNodeState::Cached { rlp_node } => {
656                    NodeStateRecord::Cached { rlp_node: hex::encode(rlp_node.as_ref()) }
657                }
658                ArenaSparseNodeState::Dirty => NodeStateRecord::Dirty,
659            }
660        }
661
662        /// Converts an [`ArenaSparseNode`] into a [`ProofTrieNodeRecord`] at the given path.
663        /// For branch children, resolves revealed children's cached RLP from `arena`.
664        /// Returns `None` for subtrie/taken-subtrie nodes (handled separately).
665        fn node_to_record(
666            arena: &NodeArena,
667            idx: Index,
668            path: Nibbles,
669        ) -> Option<ProofTrieNodeRecord> {
670            match &arena[idx] {
671                ArenaSparseNode::EmptyRoot => Some(ProofTrieNodeRecord {
672                    path,
673                    node: TrieNodeRecord(TrieNode::EmptyRoot),
674                    masks: None,
675                    short_key: None,
676                    state: None,
677                }),
678                ArenaSparseNode::Branch(b) => {
679                    let stack = b
680                        .children
681                        .iter()
682                        .map(|child| match child {
683                            ArenaSparseNodeBranchChild::Blinded(rlp) => rlp.clone(),
684                            ArenaSparseNodeBranchChild::Revealed(child_idx) => {
685                                // After pruning / root(), all nodes have cached RLP.
686                                arena[*child_idx]
687                                    .state_ref()
688                                    .and_then(|s| s.cached_rlp_node())
689                                    .cloned()
690                                    .unwrap_or_default()
691                            }
692                        })
693                        .collect();
694                    Some(ProofTrieNodeRecord {
695                        path,
696                        node: TrieNodeRecord(TrieNode::Branch(BranchNode::new(
697                            stack,
698                            b.state_mask,
699                        ))),
700                        masks: Some((
701                            b.branch_masks.hash_mask.get(),
702                            b.branch_masks.tree_mask.get(),
703                        )),
704                        short_key: (!b.short_key.is_empty()).then_some(b.short_key),
705                        state: Some(state_to_record(&b.state)),
706                    })
707                }
708                ArenaSparseNode::Leaf { key, value, state, .. } => Some(ProofTrieNodeRecord {
709                    path,
710                    node: TrieNodeRecord(TrieNode::Leaf(alloy_trie::nodes::LeafNode::new(
711                        *key,
712                        value.clone(),
713                    ))),
714                    masks: None,
715                    short_key: None,
716                    state: Some(state_to_record(state)),
717                }),
718                ArenaSparseNode::Subtrie(_) | ArenaSparseNode::TakenSubtrie => None,
719            }
720        }
721
722        /// Walks an arena depth-first using `cursor` and collects all nodes as records.
723        fn collect_records(
724            arena: &mut NodeArena,
725            root: Index,
726            root_path: Nibbles,
727            cursor: &mut ArenaCursor,
728            result: &mut Vec<ProofTrieNodeRecord>,
729        ) {
730            cursor.reset(arena, root, root_path);
731
732            // The cursor starts with root on the stack but `next` only yields children.
733            if let Some(record) = node_to_record(arena, root, root_path) {
734                result.push(record);
735            }
736
737            loop {
738                match cursor.next(arena, |_, node| {
739                    matches!(node, ArenaSparseNode::Branch(_) | ArenaSparseNode::Leaf { .. })
740                }) {
741                    NextResult::Done => break,
742                    NextResult::Branch | NextResult::NonBranch => {
743                        let head = cursor.head().expect("cursor is non-empty");
744                        if let Some(record) = node_to_record(arena, head.index, head.path) {
745                            result.push(record);
746                        }
747                    }
748                }
749            }
750        }
751
752        let mut nodes = Vec::new();
753
754        // Collect from the upper arena.
755        collect_records(
756            &mut self.upper_arena,
757            self.root,
758            Nibbles::default(),
759            &mut self.buffers.cursor,
760            &mut nodes,
761        );
762
763        // Collect from all subtries.
764        for (_, node) in &mut self.upper_arena {
765            if let ArenaSparseNode::Subtrie(subtrie) = node {
766                collect_records(
767                    &mut subtrie.arena,
768                    subtrie.root,
769                    subtrie.path,
770                    &mut self.buffers.cursor,
771                    &mut nodes,
772                );
773            }
774        }
775
776        // Reset the recorder and record that we pruned, then the initial state.
777        self.debug_recorder.reset();
778        self.debug_recorder.record(RecordedOp::Prune);
779
780        // First node is the root → SetRoot, remaining → RevealNodes.
781        if let Some(root_record) = nodes.first() {
782            self.debug_recorder.record(RecordedOp::SetRoot { node: root_record.clone() });
783        }
784        if nodes.len() > 1 {
785            self.debug_recorder.record(RecordedOp::RevealNodes { nodes: nodes[1..].to_vec() });
786        }
787    }
788
789    /// Returns `true` if a node at the given path length should be placed in a subtrie rather
790    /// than the upper arena.
791    const fn should_be_subtrie(path_len: usize) -> bool {
792        path_len == UPPER_TRIE_MAX_DEPTH
793    }
794
795    /// If the child at the cursor head should be a subtrie based on its depth, wraps it
796    /// in [`ArenaSparseNode::Subtrie`].
797    ///
798    /// The child must be the cursor head and its parent the cursor's parent.
799    fn maybe_wrap_in_subtrie(&mut self, child_idx: Index, child_path: &Nibbles) {
800        if !Self::should_be_subtrie(child_path.len()) {
801            return;
802        }
803
804        // Only branch and leaf nodes can become subtrie roots.
805        if !matches!(
806            self.upper_arena[child_idx],
807            ArenaSparseNode::Branch(_) | ArenaSparseNode::Leaf { .. }
808        ) {
809            return;
810        }
811
812        trace!(target: TRACE_TARGET, ?child_path, "Wrapping child into subtrie");
813        let mut subtrie = ArenaSparseSubtrie::new(self.updates.is_some());
814        subtrie.path = *child_path;
815        let mut root_node =
816            mem::replace(&mut self.upper_arena[child_idx], ArenaSparseNode::TakenSubtrie);
817
818        // Migrate any revealed children from the upper arena into the subtrie arena.
819        if let ArenaSparseNode::Branch(b) = &mut root_node {
820            for child in &mut b.children {
821                if let ArenaSparseNodeBranchChild::Revealed(idx) = child {
822                    *idx =
823                        Self::migrate_nodes(&mut subtrie.arena, &mut self.upper_arena, *idx, None);
824                }
825            }
826        }
827
828        subtrie.arena[subtrie.root] = root_node;
829        let (leaves, dirty) = Self::count_leaves_and_dirty(&subtrie.arena, subtrie.root);
830        subtrie.num_leaves = leaves;
831        subtrie.num_dirty_leaves = dirty;
832        #[cfg(debug_assertions)]
833        subtrie.debug_assert_counters();
834        self.upper_arena[child_idx] = ArenaSparseNode::Subtrie(subtrie);
835    }
836
837    /// If the cursor head is a branch, wraps any revealed children that sit at
838    /// the subtrie boundary depth (`UPPER_TRIE_MAX_DEPTH`). This is needed after
839    /// structural changes like root-level splits or subtrie unwraps that can place
840    /// non-subtrie nodes at the boundary depth.
841    fn maybe_wrap_branch_children(&mut self, cursor: &ArenaCursor) {
842        let head = cursor.head().expect("cursor is non-empty");
843        let head_idx = head.index;
844        let head_path = head.path;
845
846        let ArenaSparseNode::Branch(b) = &self.upper_arena[head_idx] else { return };
847        let short_key = b.short_key;
848        let children: SmallVec<[_; 4]> = b
849            .child_iter()
850            .filter_map(|(nibble, child)| match child {
851                ArenaSparseNodeBranchChild::Revealed(idx) => Some((nibble, *idx)),
852                ArenaSparseNodeBranchChild::Blinded(_) => None,
853            })
854            .collect();
855
856        for (nibble, child_idx) in children {
857            let mut child_path = head_path;
858            child_path.extend(&short_key);
859            child_path.push_unchecked(nibble);
860            self.maybe_wrap_in_subtrie(child_idx, &child_path);
861        }
862    }
863
864    /// Checks whether the subtrie at the cursor head has become empty after updates.
865    /// If the subtrie's root is [`ArenaSparseNode::EmptyRoot`] (all leaves were removed), the
866    /// child slot is removed from the parent branch entirely, the subtrie is recycled, and
867    /// if the parent is left with a single revealed child, it is collapsed via
868    /// `collapse_branch`.
869    ///
870    /// The subtrie must be the cursor head and its parent the cursor's parent.
871    /// Pops the subtrie entry (propagating leaf count deltas) before returning.
872    #[instrument(
873        level = "trace",
874        target = TRACE_TARGET,
875        skip_all,
876        fields(subtrie_path = ?cursor.head().expect("cursor is non-empty").path),
877    )]
878    fn maybe_unwrap_subtrie(&mut self, cursor: &mut ArenaCursor) {
879        let subtrie_idx = cursor.head().expect("cursor is non-empty").index;
880
881        let ArenaSparseNode::Subtrie(subtrie) = &self.upper_arena[subtrie_idx] else {
882            return;
883        };
884
885        if !matches!(subtrie.arena[subtrie.root], ArenaSparseNode::EmptyRoot) {
886            return;
887        }
888
889        let child_nibble = cursor
890            .head()
891            .expect("cursor is non-empty")
892            .path
893            .last()
894            .expect("subtrie path must have at least one nibble");
895        let parent_idx = cursor.parent().expect("cursor has parent").index;
896
897        // Pop the subtrie entry before mutating, so collapse_branch sees the parent as
898        // the cursor head.
899        cursor.pop(&mut self.upper_arena);
900
901        self.recycle_subtrie_from_idx(subtrie_idx);
902
903        trace!(target: TRACE_TARGET, "Unwrapping empty subtrie, removing child slot");
904        let parent_branch = self.upper_arena[parent_idx].branch_mut();
905        let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
906            .expect("child nibble not found in parent state_mask");
907
908        parent_branch.children.remove(child_idx.get());
909        parent_branch.unset_child_bit(child_nibble);
910        // The branch structure changed (child removed), so any cached RLP is stale.
911        parent_branch.state = parent_branch.state.to_dirty();
912
913        self.maybe_collapse_or_remove_branch(cursor);
914    }
915
916    /// Merges buffered updates from a [`ArenaSparseNode::Subtrie`] and drops it.
917    ///
918    /// # Panics
919    ///
920    /// Panics if `node` is not a `Subtrie`.
921    fn recycle_subtrie(&mut self, node: ArenaSparseNode) {
922        let ArenaSparseNode::Subtrie(mut subtrie) = node else {
923            unreachable!("recycle_subtrie called on non-Subtrie node")
924        };
925        Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
926    }
927
928    /// Removes a [`ArenaSparseNode::Subtrie`] from the upper arena at `idx` and recycles it.
929    fn recycle_subtrie_from_idx(&mut self, idx: Index) {
930        let node = self.upper_arena.remove(idx).expect("subtrie exists in arena");
931        self.recycle_subtrie(node);
932    }
933
934    /// Handles cascading structural changes on the branch at the cursor head after a child
935    /// has been removed.
936    ///
937    /// Depending on the remaining child count:
938    /// - **0 children**: the branch becomes `EmptyRoot` (if root) or is removed from its parent,
939    ///   cascading upward.
940    /// - **1 child**: collapses the branch into its sole child, unless that child is a
941    ///   `TakenSubtrie` (deferred) or blinded. If the remaining child is an empty subtrie, it is
942    ///   also removed, reducing to the 0-children case.
943    /// - **2+ children**: nothing to do.
944    fn maybe_collapse_or_remove_branch(&mut self, cursor: &mut ArenaCursor) {
945        loop {
946            let branch_idx = cursor.head().expect("cursor is non-empty").index;
947
948            // Read-only phase: extract the count and remaining-child info we need before
949            // mutating. All values here are Copy so the borrow is released.
950            let count = {
951                let ArenaSparseNode::Branch(b) = &self.upper_arena[branch_idx] else {
952                    return;
953                };
954                b.state_mask.count_bits()
955            };
956
957            if count >= 2 {
958                return;
959            }
960
961            if count == 0 {
962                if branch_idx == self.root {
963                    self.upper_arena[branch_idx] = ArenaSparseNode::EmptyRoot;
964                    return;
965                }
966                // Remove the empty branch from its parent.
967                let branch_nibble = cursor
968                    .head()
969                    .expect("cursor is non-empty")
970                    .path
971                    .last()
972                    .expect("non-root branch");
973                cursor.pop(&mut self.upper_arena);
974                self.upper_arena.remove(branch_idx);
975                let parent_idx = cursor.head().expect("cursor is non-empty").index;
976                let parent_branch = self.upper_arena[parent_idx].branch_mut();
977                let child_idx = BranchChildIdx::new(parent_branch.state_mask, branch_nibble)
978                    .expect("child nibble not found in parent state_mask");
979                parent_branch.children.remove(child_idx.get());
980                parent_branch.unset_child_bit(branch_nibble);
981                parent_branch.state = parent_branch.state.to_dirty();
982                continue; // re-check the parent
983            }
984
985            // count == 1 — determine what kind of child remains.
986            let (remaining_nibble, remaining_child_idx) = {
987                let b = self.upper_arena[branch_idx].branch_ref();
988                let nibble = b.state_mask.iter().next().expect("branch has at least one child");
989                let child_idx = match &b.children[0] {
990                    ArenaSparseNodeBranchChild::Revealed(idx) => Some(*idx),
991                    ArenaSparseNodeBranchChild::Blinded(_) => None,
992                };
993                (nibble, child_idx)
994            };
995
996            let Some(child_idx) = remaining_child_idx else {
997                debug_assert!(false, "single remaining child is blinded — should have been caught by check_subtrie_collapse_needs_proof");
998                return;
999            };
1000
1001            if matches!(self.upper_arena[child_idx], ArenaSparseNode::TakenSubtrie) {
1002                // Subtrie hasn't been restored yet; collapse is deferred to the
1003                // post-restore phase.
1004                return;
1005            }
1006
1007            // Check if the remaining child is an empty subtrie that should also be removed.
1008            let is_empty_subtrie = matches!(
1009                &self.upper_arena[child_idx],
1010                ArenaSparseNode::Subtrie(s) if matches!(s.arena[s.root], ArenaSparseNode::EmptyRoot)
1011            );
1012
1013            if is_empty_subtrie {
1014                self.recycle_subtrie_from_idx(child_idx);
1015                let branch = self.upper_arena[branch_idx].branch_mut();
1016                branch.children.remove(0);
1017                branch.unset_child_bit(remaining_nibble);
1018                branch.state = branch.state.to_dirty();
1019                continue; // now count == 0, will be handled next iteration
1020            }
1021
1022            // Normal collapse: the remaining child is a Leaf, Branch, or non-empty Subtrie.
1023            Self::collapse_branch(
1024                &mut self.upper_arena,
1025                cursor,
1026                &mut self.root,
1027                &mut self.buffers.updates,
1028            );
1029
1030            // After collapse, the remaining child (now at cursor head) may be a
1031            // Subtrie whose path was shortened by the collapsed branch's prefix. Since
1032            // should_be_subtrie requires path_len == UPPER_TRIE_MAX_DEPTH and the collapse
1033            // made the path shorter, the subtrie is no longer eligible — unwrap it.
1034            let child_idx = cursor.head().expect("cursor is non-empty").index;
1035            if let ArenaSparseNode::Subtrie(_) = &self.upper_arena[child_idx] {
1036                let ArenaSparseNode::Subtrie(mut subtrie) =
1037                    mem::replace(&mut self.upper_arena[child_idx], ArenaSparseNode::TakenSubtrie)
1038                else {
1039                    unreachable!()
1040                };
1041                Self::migrate_nodes(
1042                    &mut self.upper_arena,
1043                    &mut subtrie.arena,
1044                    subtrie.root,
1045                    Some(child_idx),
1046                );
1047                Self::merge_subtrie_updates(
1048                    &mut self.buffers.updates,
1049                    &mut subtrie.buffers.updates,
1050                );
1051
1052                // The migrated subtrie root may be a branch whose children now live in
1053                // the upper arena at or beyond the subtrie boundary depth. Re-wrap any
1054                // such children as subtries.
1055                self.maybe_wrap_branch_children(cursor);
1056            }
1057            return;
1058        }
1059    }
1060
1061    /// Merges updates from a subtrie's buffer into the parent's buffer.
1062    /// Both `dst` and `src` must be `Some` when updates are being tracked.
1063    ///
1064    /// Source removals cancel destination insertions (and vice versa) so that
1065    /// updates accumulated across multiple `root()` calls within a single block
1066    /// stay consistent.
1067    fn merge_subtrie_updates(
1068        dst: &mut Option<SparseTrieUpdates>,
1069        src: &mut Option<SparseTrieUpdates>,
1070    ) {
1071        if let Some(dst_updates) = dst.as_mut() {
1072            let src_updates = src.as_mut().expect("updates are enabled");
1073            debug_assert!(!src_updates.wiped, "subtrie updates should never have wiped=true");
1074
1075            // Source insertions cancel destination removals.
1076            for path in src_updates.updated_nodes.keys() {
1077                dst_updates.removed_nodes.remove(path);
1078            }
1079            dst_updates.updated_nodes.extend(src_updates.updated_nodes.drain());
1080
1081            // Source removals cancel destination insertions.
1082            for path in &src_updates.removed_nodes {
1083                dst_updates.updated_nodes.remove(path);
1084            }
1085            dst_updates.removed_nodes.extend(src_updates.removed_nodes.drain());
1086        }
1087    }
1088
1089    /// Right-pads a nibble path with zeros and packs it into a [`B256`].
1090    fn nibbles_to_padded_b256(path: &Nibbles) -> B256 {
1091        let mut bytes = [0u8; 32];
1092        path.pack_to(&mut bytes);
1093        B256::from(bytes)
1094    }
1095
1096    /// Returns the [`BranchNodeMasks`] for a branch based on the status of its children.
1097    fn get_branch_masks(arena: &NodeArena, branch: &ArenaSparseNodeBranch) -> BranchNodeMasks {
1098        let mut masks = BranchNodeMasks::default();
1099
1100        for (nibble, child) in branch.child_iter() {
1101            let (hash_bit, tree_bit) = match child {
1102                ArenaSparseNodeBranchChild::Blinded(_) => (
1103                    branch.branch_masks.hash_mask.is_bit_set(nibble),
1104                    branch.branch_masks.tree_mask.is_bit_set(nibble),
1105                ),
1106                ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1107                    let child = &arena[*child_idx];
1108                    (child.hash_mask_bit(), child.tree_mask_bit())
1109                }
1110            };
1111
1112            masks.set_child_bits(nibble, hash_bit, tree_bit);
1113        }
1114
1115        masks
1116    }
1117
1118    /// Computes and caches `RlpNode` for all dirty nodes reachable from `root` in `arena`.
1119    ///
1120    /// Uses the cursor's stack to walk dirty branches depth-first. For each branch,
1121    /// children are iterated left-to-right:
1122    /// - Blinded, cached, leaf, and `EmptyRoot` children have their `RlpNode` pushed directly onto
1123    ///   `rlp_node_buf`.
1124    /// - Dirty branch children are pushed onto `stack` and processed recursively first.
1125    ///
1126    /// When a dirty branch child finishes and is popped, the parent resumes iteration after
1127    /// the child's nibble. Once all children of a branch are processed, the branch is encoded
1128    /// via `BranchNodeRef` using the last N entries on `rlp_node_buf`, then replaced with a
1129    /// single result `RlpNode`.
1130    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, fields(base_path = ?base_path), ret)]
1131    fn update_cached_rlp(
1132        arena: &mut NodeArena,
1133        root: Index,
1134        cursor: &mut ArenaCursor,
1135        rlp_buf: &mut Vec<u8>,
1136        rlp_node_buf: &mut Vec<RlpNode>,
1137        base_path: Nibbles,
1138        updates: &mut Option<SparseTrieUpdates>,
1139    ) -> RlpNode {
1140        rlp_node_buf.clear();
1141
1142        // Step 1: Handle trivial roots that don't need the stack-based walk.
1143        // EmptyRoot has no state to update. Leaves are encoded in place. Already-cached
1144        // branches need no work. Only dirty branches enter the main loop below.
1145        match &arena[root] {
1146            ArenaSparseNode::EmptyRoot => return RlpNode::word_rlp(&EMPTY_ROOT_HASH),
1147            ArenaSparseNode::Leaf { .. } => {
1148                Self::encode_leaf(arena, root, rlp_buf, rlp_node_buf);
1149                return rlp_node_buf.pop().expect("encode_leaf must push an RlpNode");
1150            }
1151            ArenaSparseNode::Branch(b) => {
1152                if let ArenaSparseNodeState::Cached { rlp_node, .. } = &b.state {
1153                    return rlp_node.clone();
1154                }
1155            }
1156            ArenaSparseNode::Subtrie(_) | ArenaSparseNode::TakenSubtrie => {
1157                unreachable!("Subtrie/TakenSubtrie should not appear inside a subtrie's own arena");
1158            }
1159        }
1160
1161        cursor.reset(arena, root, base_path);
1162
1163        // Step 2: Walk dirty branches depth-first using `cursor.next`. Only dirty branches
1164        // are descended into; all other children (leaves, cached branches, blinded, subtries)
1165        // are encoded when their parent branch is popped.
1166        loop {
1167            let result = cursor.next(&mut *arena, |_, node| {
1168                matches!(
1169                    node,
1170                    ArenaSparseNode::Branch(b) if matches!(b.state, ArenaSparseNodeState::Dirty)
1171                )
1172            });
1173
1174            match result {
1175                NextResult::Done => break,
1176                NextResult::NonBranch => {
1177                    unreachable!("should_descend only returns true for dirty branches")
1178                }
1179                NextResult::Branch => {}
1180            };
1181
1182            let head = cursor.head().expect("cursor is non-empty");
1183            let head_idx = head.index;
1184            let head_path = head.path;
1185
1186            // The branch at `head_idx` is exhausted. All its dirty child branches
1187            // have already been encoded and cached. Collect all children's RLP nodes
1188            // and encode the branch.
1189            trace!(
1190                target: TRACE_TARGET,
1191                branch_path = ?head_path,
1192                branch_short_key = ?arena[head_idx].short_key().expect("head is a branch"),
1193                state_mask = ?arena[head_idx].branch_ref().state_mask,
1194                "Calculating branch RlpNode",
1195            );
1196
1197            rlp_node_buf.clear();
1198            let state_mask = arena[head_idx].branch_ref().state_mask;
1199            for (child_idx, _nibble) in BranchChildIter::new(state_mask) {
1200                match &arena[head_idx].branch_ref().children[child_idx] {
1201                    ArenaSparseNodeBranchChild::Blinded(rlp_node) => {
1202                        rlp_node_buf.push(rlp_node.clone());
1203                    }
1204                    ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1205                        let child_idx = *child_idx;
1206                        match &arena[child_idx] {
1207                            ArenaSparseNode::Leaf { .. } => {
1208                                Self::encode_leaf(arena, child_idx, rlp_buf, rlp_node_buf);
1209                            }
1210                            ArenaSparseNode::Branch(child_b) => {
1211                                let ArenaSparseNodeState::Cached { rlp_node, .. } = &child_b.state
1212                                else {
1213                                    panic!("child branch must be cached after DFS");
1214                                };
1215                                rlp_node_buf.push(rlp_node.clone());
1216                            }
1217                            ArenaSparseNode::Subtrie(subtrie) => {
1218                                let subtrie_root = &subtrie.arena[subtrie.root];
1219                                match subtrie_root {
1220                                    ArenaSparseNode::Branch(ArenaSparseNodeBranch {
1221                                        state: ArenaSparseNodeState::Cached { rlp_node, .. },
1222                                        ..
1223                                    }) |
1224                                    ArenaSparseNode::Leaf {
1225                                        state: ArenaSparseNodeState::Cached { rlp_node, .. },
1226                                        ..
1227                                    } => {
1228                                        rlp_node_buf.push(rlp_node.clone());
1229                                    }
1230                                    _ => panic!("subtrie root must be a cached Branch or Leaf"),
1231                                }
1232                            }
1233                            ArenaSparseNode::TakenSubtrie | ArenaSparseNode::EmptyRoot => {
1234                                unreachable!("Unexpected child {:?}", arena[child_idx]);
1235                            }
1236                        }
1237                    }
1238                }
1239            }
1240
1241            // Encode the branch, optionally wrapping in an extension if it has a short_key.
1242            let b = arena[head_idx].branch_ref();
1243            let short_key = b.short_key;
1244            let state_mask = b.state_mask;
1245            let prev_branch_masks = b.branch_masks;
1246            let new_branch_masks = Self::get_branch_masks(arena, b);
1247            let was_dirty = matches!(b.state, ArenaSparseNodeState::Dirty);
1248
1249            rlp_buf.clear();
1250            let rlp_node = BranchNodeRef::new(rlp_node_buf, state_mask).rlp(rlp_buf);
1251
1252            let rlp_node = if short_key.is_empty() {
1253                rlp_node
1254            } else {
1255                rlp_buf.clear();
1256                ExtensionNodeRef::new(&short_key, &rlp_node).rlp(rlp_buf)
1257            };
1258
1259            trace!(
1260                target: TRACE_TARGET,
1261                path = ?head_path,
1262                short_key = ?arena[head_idx].short_key(),
1263                children = ?state_mask.iter().zip(rlp_node_buf.iter()).collect::<Vec<_>>(),
1264                rlp_node = ?rlp_node,
1265                "Calculated branch RlpNode",
1266            );
1267
1268            let branch = arena[head_idx].branch_mut();
1269            branch.state = ArenaSparseNodeState::Cached { rlp_node: rlp_node.clone() };
1270            branch.branch_masks = new_branch_masks;
1271
1272            // Record trie updates for dirty branches only.
1273            // Skip the root node (empty logical path) as PST does.
1274            if let Some(trie_updates) = updates.as_mut().filter(|_| was_dirty) {
1275                let mut logical_path = head_path;
1276                logical_path.extend(&short_key);
1277
1278                if !logical_path.is_empty() {
1279                    if !prev_branch_masks.is_empty() && new_branch_masks.is_empty() {
1280                        trie_updates.updated_nodes.remove(&logical_path);
1281                        trie_updates.removed_nodes.insert(logical_path);
1282                    } else if !new_branch_masks.is_empty() {
1283                        let compact = arena[head_idx].branch_ref().branch_node_compact(arena);
1284                        trie_updates.updated_nodes.insert(logical_path, compact);
1285                        trie_updates.removed_nodes.remove(&logical_path);
1286                    }
1287                }
1288            }
1289        }
1290
1291        let ArenaSparseNodeState::Cached { rlp_node, .. } = &arena[root].branch_ref().state else {
1292            panic!("root must be cached after update_cached_rlp");
1293        };
1294        rlp_node.clone()
1295    }
1296
1297    /// Immutable traversal to find a leaf value at `full_path` starting from `root` in `arena`.
1298    /// `path_offset` is the number of nibbles already consumed from `full_path`.
1299    fn get_leaf_value_in_arena<'a>(
1300        arena: &'a NodeArena,
1301        mut current: Index,
1302        full_path: &Nibbles,
1303        mut path_offset: usize,
1304    ) -> Option<&'a Vec<u8>> {
1305        loop {
1306            match &arena[current] {
1307                ArenaSparseNode::EmptyRoot | ArenaSparseNode::TakenSubtrie => return None,
1308                ArenaSparseNode::Leaf { key, value, .. } => {
1309                    let remaining = full_path.slice(path_offset..);
1310                    return (remaining == *key).then_some(value);
1311                }
1312                ArenaSparseNode::Branch(b) => {
1313                    let short_key = &b.short_key;
1314                    let logical_end = path_offset + short_key.len();
1315                    if full_path.len() <= logical_end ||
1316                        full_path.slice(path_offset..logical_end) != *short_key
1317                    {
1318                        return None;
1319                    }
1320
1321                    let child_nibble = full_path.get_unchecked(logical_end);
1322                    let child_idx = BranchChildIdx::new(b.state_mask, child_nibble)?;
1323                    match &b.children[child_idx] {
1324                        ArenaSparseNodeBranchChild::Blinded(_) => return None,
1325                        ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1326                            current = *child_idx;
1327                            path_offset = logical_end + 1;
1328                        }
1329                    }
1330                }
1331                ArenaSparseNode::Subtrie(subtrie) => {
1332                    return Self::get_leaf_value_in_arena(
1333                        &subtrie.arena,
1334                        subtrie.root,
1335                        full_path,
1336                        path_offset,
1337                    );
1338                }
1339            }
1340        }
1341    }
1342
1343    /// Immutable traversal from the given root in `arena`, following `full_path` to find a leaf.
1344    /// Returns whether the leaf exists or not, or an error if a blinded node is encountered or
1345    /// the value doesn't match.
1346    fn find_leaf_in_arena(
1347        arena: &NodeArena,
1348        mut current: Index,
1349        full_path: &Nibbles,
1350        mut path_offset: usize,
1351        expected_value: Option<&Vec<u8>>,
1352    ) -> Result<LeafLookup, LeafLookupError> {
1353        loop {
1354            match &arena[current] {
1355                ArenaSparseNode::EmptyRoot | ArenaSparseNode::TakenSubtrie => {
1356                    return Ok(LeafLookup::NonExistent);
1357                }
1358                ArenaSparseNode::Leaf { key, value, .. } => {
1359                    let remaining = full_path.slice(path_offset..);
1360                    if remaining != *key {
1361                        return Ok(LeafLookup::NonExistent);
1362                    }
1363                    if let Some(expected) = expected_value &&
1364                        *expected != *value
1365                    {
1366                        return Err(LeafLookupError::ValueMismatch {
1367                            path: *full_path,
1368                            expected: Some(expected.clone()),
1369                            actual: value.clone(),
1370                        });
1371                    }
1372                    return Ok(LeafLookup::Exists);
1373                }
1374                ArenaSparseNode::Branch(b) => {
1375                    let short_key = &b.short_key;
1376                    let logical_end = path_offset + short_key.len();
1377
1378                    if full_path.len() <= logical_end {
1379                        return Ok(LeafLookup::NonExistent);
1380                    }
1381
1382                    if full_path.slice(path_offset..logical_end) != *short_key {
1383                        return Ok(LeafLookup::NonExistent);
1384                    }
1385
1386                    let child_nibble = full_path.get_unchecked(logical_end);
1387                    let Some(child_idx) = BranchChildIdx::new(b.state_mask, child_nibble) else {
1388                        return Ok(LeafLookup::NonExistent);
1389                    };
1390
1391                    match &b.children[child_idx] {
1392                        ArenaSparseNodeBranchChild::Blinded(rlp_node) => {
1393                            let hash = rlp_node
1394                                .as_hash()
1395                                .unwrap_or_else(|| keccak256(rlp_node.as_slice()));
1396                            let mut blinded_path = full_path.slice(..logical_end);
1397                            blinded_path.push_unchecked(child_nibble);
1398                            return Err(LeafLookupError::BlindedNode { path: blinded_path, hash });
1399                        }
1400                        ArenaSparseNodeBranchChild::Revealed(child_idx) => {
1401                            current = *child_idx;
1402                            path_offset = logical_end + 1;
1403                        }
1404                    }
1405                }
1406                ArenaSparseNode::Subtrie(subtrie) => {
1407                    return Self::find_leaf_in_arena(
1408                        &subtrie.arena,
1409                        subtrie.root,
1410                        full_path,
1411                        path_offset,
1412                        expected_value,
1413                    );
1414                }
1415            }
1416        }
1417    }
1418
1419    /// Encodes a leaf node's RLP and pushes it onto `rlp_node_buf`. If the leaf is already
1420    /// cached, its existing `RlpNode` is reused.
1421    fn encode_leaf(
1422        arena: &mut NodeArena,
1423        idx: Index,
1424        rlp_buf: &mut Vec<u8>,
1425        rlp_node_buf: &mut Vec<RlpNode>,
1426    ) {
1427        let (key, value, state) = match &arena[idx] {
1428            ArenaSparseNode::Leaf { key, value, state } => (key, value, state),
1429            _ => unreachable!("encode_leaf called on non-Leaf node"),
1430        };
1431
1432        if let ArenaSparseNodeState::Cached { rlp_node, .. } = state {
1433            rlp_node_buf.push(rlp_node.clone());
1434            return;
1435        }
1436
1437        rlp_buf.clear();
1438        let rlp_node = LeafNodeRef { key, value }.rlp(rlp_buf);
1439
1440        *arena[idx].state_mut() = ArenaSparseNodeState::Cached { rlp_node: rlp_node.clone() };
1441        rlp_node_buf.push(rlp_node);
1442    }
1443
1444    /// Creates a new leaf and a new branch that splits an existing child from the new leaf at
1445    /// a divergence point. Returns the index of the new branch.
1446    ///
1447    /// `new_leaf_path` is the full remaining path for the new leaf (relative to the split
1448    /// point's parent).
1449    ///
1450    /// The old child's key (leaf) or `short_key` (branch) is truncated to the suffix after the
1451    /// divergence nibble and its state is set to dirty.
1452    ///
1453    /// The top of `stack` must be the leaf or branch being split. The top of stack will be the
1454    /// newly created branch once this returns.
1455    /// Returns `true` if the existing node was not already dirty (i.e., the split newly dirtied
1456    /// it).
1457    fn split_and_insert_leaf(
1458        arena: &mut NodeArena,
1459        cursor: &mut ArenaCursor,
1460        root: &mut Index,
1461        new_leaf_path: Nibbles,
1462        value: &[u8],
1463    ) -> bool {
1464        let old_child_entry = cursor.head().expect("cursor must have head");
1465        let old_child_idx = old_child_entry.index;
1466        let old_child_short_key = arena[old_child_idx].short_key().expect("top of stack is a leaf");
1467        let diverge_len = new_leaf_path.common_prefix_length(old_child_short_key);
1468
1469        trace!(
1470            target: TRACE_TARGET,
1471            path = ?old_child_entry.path,
1472            ?new_leaf_path,
1473            ?old_child_short_key,
1474            diverge_len,
1475            "Splitting node and inserting new leaf",
1476        );
1477
1478        let old_child_nibble = old_child_short_key.get_unchecked(diverge_len);
1479        let old_child_suffix = old_child_short_key.slice(diverge_len + 1..);
1480
1481        // Truncate the old child's key/short_key and mark it dirty.
1482        // Track whether the existing node was not already dirty (a leaf that becomes newly dirty).
1483        let newly_dirtied_existing = match &mut arena[old_child_idx] {
1484            ArenaSparseNode::Leaf { key, state, .. } => {
1485                *key = old_child_suffix;
1486                let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1487                *state = ArenaSparseNodeState::Dirty;
1488                was_clean
1489            }
1490            ArenaSparseNode::Branch(b) => {
1491                b.short_key = old_child_suffix;
1492                b.state = b.state.to_dirty();
1493                // Branches don't contribute to num_dirty_leaves.
1494                false
1495            }
1496            _ => unreachable!("split_and_insert_leaf called on non-Leaf/Branch node"),
1497        };
1498
1499        let short_key = new_leaf_path.slice(..diverge_len);
1500        let new_leaf_nibble = new_leaf_path.get_unchecked(diverge_len);
1501        debug_assert_ne!(old_child_nibble, new_leaf_nibble);
1502
1503        let new_leaf_idx = arena.insert(ArenaSparseNode::Leaf {
1504            state: ArenaSparseNodeState::Dirty,
1505            key: new_leaf_path.slice(diverge_len + 1..),
1506            value: value.to_vec(),
1507        });
1508
1509        let (first_nibble, first_child, second_nibble, second_child) =
1510            if old_child_nibble < new_leaf_nibble {
1511                (old_child_nibble, old_child_idx, new_leaf_nibble, new_leaf_idx)
1512            } else {
1513                (new_leaf_nibble, new_leaf_idx, old_child_nibble, old_child_idx)
1514            };
1515
1516        let state_mask = TrieMask::from(1u16 << first_nibble | 1u16 << second_nibble);
1517        let mut children = SmallVec::with_capacity(2);
1518        children.push(ArenaSparseNodeBranchChild::Revealed(first_child));
1519        children.push(ArenaSparseNodeBranchChild::Revealed(second_child));
1520
1521        let new_branch_idx = arena.insert(ArenaSparseNode::Branch(ArenaSparseNodeBranch {
1522            state: ArenaSparseNodeState::Dirty,
1523            children,
1524            state_mask,
1525            short_key,
1526            branch_masks: BranchNodeMasks::default(),
1527        }));
1528
1529        cursor.replace_head_index(arena, root, new_branch_idx);
1530        newly_dirtied_existing
1531    }
1532
1533    /// Performs a leaf upsert using a pre-computed [`SeekResult`] from
1534    /// [`ArenaCursor::seek`].
1535    ///
1536    /// Handles three cases based on `find_result`:
1537    /// 1. `RevealedLeaf` — the cursor head is a leaf; update in place or split into a branch.
1538    /// 2. Diverged — the path diverges within the branch's `short_key`, split it.
1539    /// 3. `NoChild` — the target nibble has no child, insert a new leaf.
1540    ///
1541    /// The caller must handle [`SeekResult::Blinded`] and
1542    /// [`SeekResult::RevealedSubtrie`] before calling this function.
1543    /// The cursor must be non-empty when called.
1544    ///
1545    /// Returns an [`UpsertLeafResult`] and [`SubtrieCounterDeltas`] so the caller can maintain
1546    /// aggregate counters and decide whether to wrap the result as a subtrie.
1547    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, fields(full_path = ?full_path))]
1548    fn upsert_leaf(
1549        arena: &mut NodeArena,
1550        cursor: &mut ArenaCursor,
1551        root: &mut Index,
1552        full_path: &Nibbles,
1553        value: &[u8],
1554        find_result: SeekResult,
1555    ) -> (UpsertLeafResult, SubtrieCounterDeltas) {
1556        trace!(target: TRACE_TARGET, ?find_result, "Upserting leaf");
1557        let head = cursor.head().expect("cursor is non-empty");
1558
1559        match find_result {
1560            SeekResult::Blinded => {
1561                unreachable!("Blinded case must be handled by caller")
1562            }
1563            SeekResult::EmptyRoot => {
1564                let head_idx = head.index;
1565                let head_path = head.path;
1566                arena[head_idx] = ArenaSparseNode::Leaf {
1567                    state: ArenaSparseNodeState::Dirty,
1568                    key: full_path.slice(head_path.len()..),
1569                    value: value.to_vec(),
1570                };
1571                (
1572                    UpsertLeafResult::NewLeaf,
1573                    SubtrieCounterDeltas { num_leaves_delta: 1, num_dirty_leaves_delta: 1 },
1574                )
1575            }
1576            SeekResult::RevealedLeaf => {
1577                // RevealedLeaf guarantees the leaf's full path matches the target exactly.
1578                let head_idx = head.index;
1579                let was_clean =
1580                    if let ArenaSparseNode::Leaf { value: v, state, .. } = &mut arena[head_idx] {
1581                        v.clear();
1582                        v.extend_from_slice(value);
1583                        let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1584                        *state = ArenaSparseNodeState::Dirty;
1585                        was_clean
1586                    } else {
1587                        unreachable!("RevealedLeaf but cursor head is not a leaf")
1588                    };
1589                (
1590                    UpsertLeafResult::Updated,
1591                    SubtrieCounterDeltas {
1592                        num_leaves_delta: 0,
1593                        num_dirty_leaves_delta: was_clean as i64,
1594                    },
1595                )
1596            }
1597            SeekResult::Diverged => {
1598                let head_path = head.path;
1599                let full_path_from_head = full_path.slice(head_path.len()..);
1600
1601                let split_dirtied_existing =
1602                    Self::split_and_insert_leaf(arena, cursor, root, full_path_from_head, value);
1603
1604                let result = if cursor.depth() >= 1 {
1605                    UpsertLeafResult::NewChild
1606                } else {
1607                    UpsertLeafResult::NewLeaf
1608                };
1609                (
1610                    result,
1611                    SubtrieCounterDeltas {
1612                        num_leaves_delta: 1,
1613                        num_dirty_leaves_delta: 1 + split_dirtied_existing as i64,
1614                    },
1615                )
1616            }
1617            SeekResult::NoChild { child_nibble } => {
1618                let head_idx = head.index;
1619
1620                let head_branch_logical_path = cursor.head_logical_branch_path(arena);
1621                let leaf_key = full_path.slice(head_branch_logical_path.len() + 1..);
1622                let new_leaf = arena.insert(ArenaSparseNode::Leaf {
1623                    state: ArenaSparseNodeState::Dirty,
1624                    key: leaf_key,
1625                    value: value.to_vec(),
1626                });
1627
1628                let branch = arena[head_idx].branch_mut();
1629                branch.set_child(child_nibble, ArenaSparseNodeBranchChild::Revealed(new_leaf));
1630
1631                // Re-seek to position the cursor on the newly inserted leaf.
1632                cursor.seek(arena, full_path);
1633
1634                (
1635                    UpsertLeafResult::NewChild,
1636                    SubtrieCounterDeltas { num_leaves_delta: 1, num_dirty_leaves_delta: 1 },
1637                )
1638            }
1639            SeekResult::RevealedSubtrie => {
1640                unreachable!("RevealedSubtrie must be handled by caller")
1641            }
1642        }
1643    }
1644
1645    /// Removes a leaf node from the trie using a pre-computed [`SeekResult`] from
1646    /// [`ArenaCursor::seek`].
1647    ///
1648    /// Only the `RevealedLeaf` case performs a removal — the leaf must exist and its full path
1649    /// must match `full_path`. All other cases (`Diverged`, `NoChild`) are no-ops since the leaf
1650    /// doesn't exist at that path.
1651    ///
1652    /// When removing a leaf from a branch, if the branch is left with only one remaining child,
1653    /// the branch is collapsed: the remaining child absorbs the branch's `short_key` + the child's
1654    /// nibble as a prefix to its own key/`short_key`, and replaces the branch in the parent.
1655    /// If the remaining child is blinded, the collapse cannot proceed and a
1656    /// [`RemoveLeafResult::NeedsProof`] is returned so the caller can request a proof.
1657    ///
1658    /// The caller must handle [`SeekResult::Blinded`] and
1659    /// [`SeekResult::RevealedSubtrie`] before calling this function.
1660    fn remove_leaf(
1661        arena: &mut NodeArena,
1662        cursor: &mut ArenaCursor,
1663        root: &mut Index,
1664        key: B256,
1665        full_path: &Nibbles,
1666        find_result: SeekResult,
1667        updates: &mut Option<SparseTrieUpdates>,
1668    ) -> (RemoveLeafResult, SubtrieCounterDeltas) {
1669        match find_result {
1670            SeekResult::Blinded | SeekResult::RevealedSubtrie => {
1671                unreachable!("Blinded/RevealedSubtrie must be handled by caller")
1672            }
1673            SeekResult::EmptyRoot | SeekResult::Diverged | SeekResult::NoChild { .. } => {
1674                (RemoveLeafResult::NotFound, SubtrieCounterDeltas::default())
1675            }
1676            SeekResult::RevealedLeaf => {
1677                // RevealedLeaf guarantees the leaf's full path matches the target exactly.
1678                let head = cursor.head().expect("cursor is non-empty");
1679                let head_idx = head.index;
1680                let head_path = head.path;
1681
1682                trace!(
1683                    target: TRACE_TARGET,
1684                    path = ?head_path,
1685                    ?full_path,
1686                    "Removing leaf",
1687                );
1688
1689                // Before mutating, check if removing this leaf would leave the parent
1690                // branch with a single blinded sibling (requiring a proof to collapse).
1691                if let Some(parent_entry) = cursor.parent() {
1692                    let parent_idx = parent_entry.index;
1693                    let child_nibble = head_path.last().expect("non-root leaf");
1694                    let parent_branch = arena[parent_idx].branch_ref();
1695
1696                    if parent_branch.state_mask.count_bits() == 2 &&
1697                        parent_branch.sibling_child(child_nibble).is_blinded()
1698                    {
1699                        let sibling_nibble = parent_branch
1700                            .state_mask
1701                            .iter()
1702                            .find(|&n| n != child_nibble)
1703                            .expect("branch has two children");
1704                        let mut sibling_path = cursor.parent_logical_branch_path(arena);
1705                        sibling_path.push_unchecked(sibling_nibble);
1706                        trace!(target: TRACE_TARGET, ?full_path, ?sibling_path, "Removal would collapse branch onto blinded sibling, requesting proof");
1707                        return (
1708                            RemoveLeafResult::NeedsProof {
1709                                key,
1710                                proof_key: Self::nibbles_to_padded_b256(&sibling_path),
1711                                min_len: (sibling_path.len() as u8).min(64),
1712                            },
1713                            SubtrieCounterDeltas::default(),
1714                        );
1715                    }
1716                }
1717
1718                // Check if the removed leaf was dirty before removing it.
1719                let removed_was_dirty =
1720                    matches!(arena[head_idx].state_ref(), Some(ArenaSparseNodeState::Dirty));
1721
1722                if cursor.depth() == 0 {
1723                    // The leaf is the root — replace with EmptyRoot and reset the cursor
1724                    // so subsequent iterations can call seek normally.
1725                    arena.remove(head_idx);
1726                    *root = arena.insert(ArenaSparseNode::EmptyRoot);
1727                    cursor.reset(arena, *root, head_path);
1728                    return (
1729                        RemoveLeafResult::Removed,
1730                        SubtrieCounterDeltas {
1731                            num_leaves_delta: -1,
1732                            num_dirty_leaves_delta: -(removed_was_dirty as i64),
1733                        },
1734                    );
1735                }
1736
1737                // Pop the leaf entry, propagating dirty state to the parent.
1738                cursor.pop(arena);
1739
1740                // The parent must be a branch. Remove the leaf from it.
1741                let parent_entry = cursor.head().expect("cursor is non-empty");
1742                let parent_idx = parent_entry.index;
1743                let child_nibble = head_path.last().expect("non-root leaf");
1744
1745                // Remove the leaf from the arena and from the parent's children.
1746                arena.remove(head_idx);
1747                let parent_branch = arena[parent_idx].branch_mut();
1748                parent_branch.remove_child(child_nibble);
1749
1750                // If the branch now has only one child, collapse it. The blinded sibling
1751                // case was already handled above before any mutations.
1752                let collapse_dirtied_leaf = if parent_branch.state_mask.count_bits() == 1 {
1753                    Self::collapse_branch(arena, cursor, root, updates)
1754                } else {
1755                    false
1756                };
1757                (
1758                    RemoveLeafResult::Removed,
1759                    SubtrieCounterDeltas {
1760                        num_leaves_delta: -1,
1761                        num_dirty_leaves_delta: (collapse_dirtied_leaf as i64) -
1762                            (removed_was_dirty as i64),
1763                    },
1764                )
1765            }
1766        }
1767    }
1768
1769    /// Checks whether a subtrie receiving only removals would cause its parent branch to collapse
1770    /// onto a single blinded sibling. If so, returns the proof needed to reveal that blinded
1771    /// sibling so the caller can request it and skip the subtrie's updates.
1772    ///
1773    /// Returns `Some(proof)` for the blinded sibling when the edge-case applies, `None` otherwise.
1774    fn check_subtrie_collapse_needs_proof(
1775        arena: &NodeArena,
1776        cursor: &ArenaCursor,
1777        subtrie_updates: &[(B256, Nibbles, LeafUpdate)],
1778    ) -> Option<ArenaRequiredProof> {
1779        let num_removals = subtrie_updates
1780            .iter()
1781            .filter(|(_, _, u)| matches!(u, LeafUpdate::Changed(v) if v.is_empty()))
1782            .count() as u64;
1783
1784        // Touched is a no-op that doesn't alter trie structure, so it must be
1785        // excluded when deciding whether "all updates are removals". This mirrors
1786        // the `all_removals` / `might_empty_subtrie` filter in `update_leaves`.
1787        // Without this, a batch of removals + Touched entries
1788        // would fail the `num_removals != num_changed` check, skip the proof
1789        // request for the blinded sibling, and later panic in
1790        // `maybe_collapse_or_remove_branch` when the subtrie empties inline.
1791        let num_changed =
1792            subtrie_updates.iter().filter(|(_, _, u)| matches!(u, LeafUpdate::Changed(_))).count()
1793                as u64;
1794
1795        if num_removals == 0 || num_removals != num_changed {
1796            return None;
1797        }
1798
1799        // The subtrie is the cursor head; its parent is the cursor's parent.
1800        let subtrie_entry = cursor.head()?;
1801        let subtrie_num_leaves = match &arena[subtrie_entry.index] {
1802            ArenaSparseNode::Subtrie(s) => s.num_leaves,
1803            _ => return None,
1804        };
1805        if num_removals < subtrie_num_leaves {
1806            return None;
1807        }
1808
1809        let child_nibble =
1810            subtrie_entry.path.last().expect("subtrie path must have at least one nibble");
1811
1812        let parent_entry = cursor.parent()?;
1813        let parent_branch = arena[parent_entry.index].branch_ref();
1814        if parent_branch.state_mask.count_bits() != 2 {
1815            return None;
1816        }
1817
1818        if !parent_branch.sibling_child(child_nibble).is_blinded() {
1819            return None;
1820        }
1821
1822        let sibling_nibble = parent_branch
1823            .state_mask
1824            .iter()
1825            .find(|&n| n != child_nibble)
1826            .expect("branch has two children");
1827        let mut sibling_path = cursor.parent_logical_branch_path(arena);
1828        sibling_path.push_unchecked(sibling_nibble);
1829
1830        Some(ArenaRequiredProof {
1831            key: Self::nibbles_to_padded_b256(&sibling_path),
1832            min_len: (sibling_path.len() as u8).min(64),
1833        })
1834    }
1835
1836    /// Collapses a branch node that has exactly one remaining revealed child. The branch's
1837    /// `short_key`, the remaining child's nibble, and the child's own key/`short_key` are
1838    /// concatenated to form the child's new key/`short_key`. The child then replaces the branch
1839    /// in the grandparent (or becomes the new root).
1840    ///
1841    /// The caller must verify that the remaining child is not blinded before calling this function.
1842    ///
1843    /// The branch being collapsed must be the current cursor head. The cursor head will be
1844    /// replaced with the remaining child which has taken its place.
1845    /// Returns `true` if the collapse dirtied a surviving leaf that was not already dirty.
1846    fn collapse_branch(
1847        arena: &mut NodeArena,
1848        cursor: &mut ArenaCursor,
1849        root: &mut Index,
1850        updates: &mut Option<SparseTrieUpdates>,
1851    ) -> bool {
1852        let branch_entry = cursor.head().expect("cursor is non-empty");
1853        let branch_idx = branch_entry.index;
1854        let branch = arena[branch_idx].branch_ref();
1855        let remaining_nibble =
1856            branch.state_mask.iter().next().expect("branch has at least one child");
1857        let branch_short_key = branch.short_key;
1858
1859        debug_assert_eq!(
1860            branch.state_mask.count_bits(),
1861            1,
1862            "collapse_branch requires exactly 1 child"
1863        );
1864        debug_assert!(
1865            !branch.children[0].is_blinded(),
1866            "collapse_branch called with a blinded remaining child"
1867        );
1868
1869        trace!(
1870            target: TRACE_TARGET,
1871            path = ?branch_entry.path,
1872            short_key = ?branch_short_key,
1873            branch_masks = ?branch.branch_masks,
1874            ?remaining_nibble,
1875            "Collapsing single-child branch",
1876        );
1877
1878        // Record the collapsed branch's logical path for trie update tracking if it
1879        // was previously persisted in the DB trie.
1880        if let Some(trie_updates) = updates.as_mut() &&
1881            !branch.branch_masks.is_empty()
1882        {
1883            let logical_path = cursor.head_logical_branch_path(arena);
1884            if !logical_path.is_empty() {
1885                trie_updates.updated_nodes.remove(&logical_path);
1886                trie_updates.removed_nodes.insert(logical_path);
1887            }
1888        }
1889
1890        // Build the prefix: branch's short_key + remaining child's nibble.
1891        let mut prefix = branch_short_key;
1892        prefix.push_unchecked(remaining_nibble);
1893
1894        let ArenaSparseNodeBranchChild::Revealed(child_idx) = branch.children[0] else {
1895            unreachable!()
1896        };
1897
1898        // Prepend the prefix to the child's key/short_key and mark dirty.
1899        // Track whether a leaf was newly dirtied by this collapse.
1900        let newly_dirtied_leaf = match &mut arena[child_idx] {
1901            ArenaSparseNode::Leaf { key, state, .. } => {
1902                let mut new_key = prefix;
1903                new_key.extend(key);
1904                *key = new_key;
1905                let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1906                *state = ArenaSparseNodeState::Dirty;
1907                was_clean
1908            }
1909            ArenaSparseNode::Branch(b) => {
1910                let mut new_short_key = prefix;
1911                new_short_key.extend(&b.short_key);
1912                b.short_key = new_short_key;
1913                b.state = b.state.to_dirty();
1914                false
1915            }
1916            ArenaSparseNode::Subtrie(subtrie) => {
1917                subtrie.path = branch_entry.path;
1918                match &mut subtrie.arena[subtrie.root] {
1919                    ArenaSparseNode::Branch(b) => {
1920                        let mut new_short_key = prefix;
1921                        new_short_key.extend(&b.short_key);
1922                        b.short_key = new_short_key;
1923                        b.state = b.state.to_dirty();
1924                    }
1925                    ArenaSparseNode::Leaf { key, state, .. } => {
1926                        let mut new_key = prefix;
1927                        new_key.extend(key);
1928                        *key = new_key;
1929                        let was_clean = !matches!(state, ArenaSparseNodeState::Dirty);
1930                        *state = ArenaSparseNodeState::Dirty;
1931                        if was_clean {
1932                            subtrie.num_dirty_leaves += 1;
1933                        }
1934                    }
1935                    _ => {
1936                        unreachable!("subtrie root must be a Branch or Leaf during collapse_branch")
1937                    }
1938                }
1939                false
1940            }
1941            _ => unreachable!("remaining child must be Leaf, Branch, or Subtrie"),
1942        };
1943
1944        // Replace the branch with the remaining child in the grandparent (or root).
1945        cursor.replace_head_index(arena, root, child_idx);
1946
1947        // Free the collapsed branch.
1948        arena.remove(branch_idx);
1949        newly_dirtied_leaf
1950    }
1951
1952    /// Counts the total leaves and dirty leaves in a subtree rooted at `idx`.
1953    fn count_leaves_and_dirty(arena: &NodeArena, idx: Index) -> (u64, u64) {
1954        match &arena[idx] {
1955            ArenaSparseNode::Leaf { state, .. } => {
1956                let dirty = matches!(state, ArenaSparseNodeState::Dirty) as u64;
1957                (1, dirty)
1958            }
1959            ArenaSparseNode::Branch(b) => {
1960                let mut leaves = 0u64;
1961                let mut dirty = 0u64;
1962                for c in &b.children {
1963                    if let ArenaSparseNodeBranchChild::Revealed(child_idx) = c {
1964                        let (l, d) = Self::count_leaves_and_dirty(arena, *child_idx);
1965                        leaves += l;
1966                        dirty += d;
1967                    }
1968                }
1969                (leaves, dirty)
1970            }
1971            _ => (0, 0),
1972        }
1973    }
1974
1975    /// Asserts that every node in the upper arena satisfies the subtrie structure invariant:
1976    /// - Nodes at `UPPER_TRIE_MAX_DEPTH` path length must be `Subtrie` (or `TakenSubtrie`).
1977    /// - Nodes at other depths must NOT be `Subtrie`.
1978    ///
1979    /// Uses the cursor to DFS the upper arena, checking each visited node's path length.
1980    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
1981    #[cfg(debug_assertions)]
1982    fn debug_assert_subtrie_structure(&mut self) {
1983        let mut cursor = mem::take(&mut self.buffers.cursor);
1984        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
1985
1986        loop {
1987            let result = cursor.next(&mut self.upper_arena, |_, _| true);
1988            match result {
1989                NextResult::Done => break,
1990                NextResult::NonBranch | NextResult::Branch => {
1991                    let head = cursor.head().expect("cursor is non-empty");
1992                    let path_len = head.path.len();
1993                    let node = &self.upper_arena[head.index];
1994
1995                    if Self::should_be_subtrie(path_len) {
1996                        debug_assert!(
1997                            matches!(
1998                                node,
1999                                ArenaSparseNode::Subtrie(_) | ArenaSparseNode::TakenSubtrie
2000                            ),
2001                            "node at path_len={path_len} should be a Subtrie but is {node:?}",
2002                        );
2003                    } else {
2004                        debug_assert!(
2005                            !matches!(node, ArenaSparseNode::Subtrie(_)),
2006                            "node at path_len={path_len} should NOT be a Subtrie but is",
2007                        );
2008                    }
2009                }
2010            }
2011        }
2012
2013        self.buffers.cursor = cursor;
2014    }
2015
2016    /// Recursively migrates all nodes from `src` into `dst`, starting at `src_idx`.
2017    /// Branch children's `Revealed` indices are remapped to the new `dst` indices during
2018    /// the migration.
2019    ///
2020    /// If `dst_slot` is `Some(idx)`, the node at `src_idx` is placed into `dst[idx]`
2021    /// (overwriting); otherwise a new slot is allocated. Returns the `dst` index of the
2022    /// migrated node.
2023    fn migrate_nodes(
2024        dst: &mut NodeArena,
2025        src: &mut NodeArena,
2026        src_idx: Index,
2027        dst_slot: Option<Index>,
2028    ) -> Index {
2029        let mut node = src.remove(src_idx).expect("node exists in source arena");
2030
2031        // Recursively migrate children first so their new indices are known.
2032        if let ArenaSparseNode::Branch(b) = &mut node {
2033            for child in &mut b.children {
2034                if let ArenaSparseNodeBranchChild::Revealed(child_idx) = child {
2035                    *child_idx = Self::migrate_nodes(dst, src, *child_idx, None);
2036                }
2037            }
2038        }
2039
2040        if let Some(slot) = dst_slot {
2041            dst[slot] = node;
2042            slot
2043        } else {
2044            dst.insert(node)
2045        }
2046    }
2047
2048    /// Removes a pruned node from the arena and blinds the parent's child slot with the node's
2049    /// cached RLP. If the node has no cached RLP (e.g. it was never hashed), the parent slot
2050    /// is left dangling — this is safe because the parent will also be removed during pruning.
2051    fn remove_pruned_node(
2052        arena: &mut NodeArena,
2053        cursor: &ArenaCursor,
2054        idx: Index,
2055        nibble: Option<u8>,
2056    ) -> ArenaSparseNode {
2057        trace!(target: TRACE_TARGET, path = ?cursor.head().unwrap().path, "pruning node");
2058        let node = arena.remove(idx).expect("node must exist to be pruned");
2059        let rlp_node = node.state_ref().and_then(|s| s.cached_rlp_node()).cloned();
2060
2061        if let Some(rlp_node) = rlp_node {
2062            let parent_idx = cursor.parent().expect("pruned child has parent").index;
2063            let child_nibble = nibble.expect("non-root child");
2064            let parent_branch = arena[parent_idx].branch_mut();
2065            let child_idx = BranchChildIdx::new(parent_branch.state_mask, child_nibble)
2066                .expect("child nibble not found in parent state_mask");
2067            parent_branch.children[child_idx] = ArenaSparseNodeBranchChild::Blinded(rlp_node);
2068        }
2069
2070        node
2071    }
2072
2073    /// Reveals a single proof node using a pre-computed [`SeekResult`] from
2074    /// [`ArenaCursor::seek`].
2075    ///
2076    /// If the result is `Blinded`, the blinded child is replaced with the proof node (converted to
2077    /// an arena node with `Cached` state). All other cases (already revealed, no child, diverged,
2078    /// leaf head) are no-ops — the proof node is skipped.
2079    ///
2080    /// Returns the `Index` of the revealed node in the arena, if any was revealed.
2081    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2082    fn reveal_node(
2083        arena: &mut NodeArena,
2084        cursor: &ArenaCursor,
2085        node: &mut ProofTrieNodeV2,
2086        find_result: SeekResult,
2087    ) -> Option<Index> {
2088        let SeekResult::Blinded = find_result else {
2089            // Already revealed, no child slot, or diverged — skip this proof node.
2090            return None;
2091        };
2092
2093        let head = cursor.head().expect("cursor is non-empty");
2094        let head_idx = head.index;
2095        let head_branch_logical_path = cursor.head_logical_branch_path(arena);
2096
2097        debug_assert_eq!(
2098            node.path.len(),
2099            head_branch_logical_path.len() + 1,
2100            "proof node path {:?} is not a direct child of branch at {:?} (expected depth {})",
2101            node.path,
2102            head_branch_logical_path,
2103            head_branch_logical_path.len() + 1,
2104        );
2105
2106        let child_nibble = node.path.get_unchecked(head_branch_logical_path.len());
2107        let head_branch = arena[head_idx].branch_ref();
2108        let dense_child_idx = BranchChildIdx::new(head_branch.state_mask, child_nibble)
2109            .expect("Blinded result but child nibble not in state_mask");
2110
2111        let cached_rlp = match &head_branch.children[dense_child_idx] {
2112            ArenaSparseNodeBranchChild::Blinded(rlp) => rlp.clone(),
2113            ArenaSparseNodeBranchChild::Revealed(_) => return None,
2114        };
2115
2116        trace!(
2117            target: TRACE_TARGET,
2118            path = ?node.path,
2119            rlp_node = ?cached_rlp,
2120            "Revealing node",
2121        );
2122
2123        let proof_node = mem::replace(node, ProofTrieNodeV2::empty());
2124        let mut arena_node = ArenaSparseNode::from_proof_node(proof_node);
2125
2126        let state = arena_node.state_mut();
2127        *state = ArenaSparseNodeState::Cached { rlp_node: cached_rlp };
2128
2129        let child_idx = arena.insert(arena_node);
2130        arena[head_idx].branch_mut().children[dense_child_idx] =
2131            ArenaSparseNodeBranchChild::Revealed(child_idx);
2132
2133        Some(child_idx)
2134    }
2135
2136    #[cfg(debug_assertions)]
2137    fn collect_reachable_nodes(
2138        arena: &NodeArena,
2139        idx: Index,
2140        reachable: &mut alloy_primitives::map::HashSet<Index>,
2141    ) {
2142        if !reachable.insert(idx) {
2143            return;
2144        }
2145        if let ArenaSparseNode::Branch(b) = &arena[idx] {
2146            for child in &b.children {
2147                if let ArenaSparseNodeBranchChild::Revealed(child_idx) = child {
2148                    Self::collect_reachable_nodes(arena, *child_idx, reachable);
2149                }
2150            }
2151        }
2152    }
2153
2154    #[cfg(debug_assertions)]
2155    fn assert_no_orphaned_nodes(arena: &NodeArena, root: Index, label: &str) {
2156        let mut reachable = alloy_primitives::map::HashSet::default();
2157        Self::collect_reachable_nodes(arena, root, &mut reachable);
2158        let all_indices: alloy_primitives::map::HashSet<Index> =
2159            arena.iter().map(|(idx, _)| idx).collect();
2160        let orphaned: Vec<_> = all_indices.difference(&reachable).collect();
2161        debug_assert!(
2162            orphaned.is_empty(),
2163            "{label} has {} orphaned node(s): {orphaned:?}",
2164            orphaned.len(),
2165        );
2166    }
2167}
2168
2169#[cfg(debug_assertions)]
2170impl Drop for ArenaParallelSparseTrie {
2171    fn drop(&mut self) {
2172        Self::assert_no_orphaned_nodes(&self.upper_arena, self.root, "upper arena");
2173
2174        for (_, node) in &self.upper_arena {
2175            if let Some(subtrie) = node.as_subtrie() {
2176                Self::assert_no_orphaned_nodes(
2177                    &subtrie.arena,
2178                    subtrie.root,
2179                    &alloc::format!("subtrie {:?}", subtrie.path),
2180                );
2181            }
2182        }
2183    }
2184}
2185
2186impl Default for ArenaParallelSparseTrie {
2187    fn default() -> Self {
2188        let mut upper_arena = SlotMap::new();
2189        let root = upper_arena.insert(ArenaSparseNode::EmptyRoot);
2190        Self {
2191            upper_arena,
2192            root,
2193            updates: None,
2194            buffers: ArenaTrieBuffers::default(),
2195            parallelism_thresholds: ArenaParallelismThresholds::default(),
2196            #[cfg(feature = "trie-debug")]
2197            debug_recorder: Default::default(),
2198        }
2199    }
2200}
2201
2202impl ArenaParallelSparseTrie {
2203    /// Hashes a subtrie at `head_idx` and collects its update actions.
2204    fn update_upper_subtrie(&mut self, head_idx: Index) {
2205        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[head_idx] else {
2206            unreachable!()
2207        };
2208
2209        if !subtrie.arena[subtrie.root].is_cached() {
2210            subtrie.update_cached_rlp();
2211        }
2212
2213        Self::merge_subtrie_updates(&mut self.buffers.updates, &mut subtrie.buffers.updates);
2214    }
2215}
2216
2217impl SparseTrie for ArenaParallelSparseTrie {
2218    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2219    fn set_root(
2220        &mut self,
2221        root: TrieNodeV2,
2222        masks: Option<BranchNodeMasks>,
2223        retain_updates: bool,
2224    ) -> SparseTrieResult<()> {
2225        #[cfg(feature = "trie-debug")]
2226        self.debug_recorder.record(RecordedOp::SetRoot {
2227            node: ProofTrieNodeRecord::from_proof_trie_node_v2(&ProofTrieNodeV2 {
2228                path: Nibbles::default(),
2229                node: root.clone(),
2230                masks,
2231            }),
2232        });
2233
2234        debug_assert!(
2235            matches!(self.upper_arena[self.root], ArenaSparseNode::EmptyRoot),
2236            "set_root called on a trie that already has revealed nodes"
2237        );
2238
2239        self.set_updates(retain_updates);
2240
2241        match root {
2242            TrieNodeV2::EmptyRoot => {
2243                trace!(target: TRACE_TARGET, "Setting empty root");
2244            }
2245            TrieNodeV2::Leaf(leaf) => {
2246                trace!(target: TRACE_TARGET, key = ?leaf.key, "Setting leaf root");
2247                self.upper_arena[self.root] = ArenaSparseNode::Leaf {
2248                    state: ArenaSparseNodeState::Revealed,
2249                    key: leaf.key,
2250                    value: leaf.value,
2251                };
2252            }
2253            TrieNodeV2::Branch(branch) => {
2254                trace!(target: TRACE_TARGET, state_mask = ?branch.state_mask, num_children = branch.state_mask.count_bits(), "Setting branch root");
2255                let mut children = SmallVec::with_capacity(branch.state_mask.count_bits() as usize);
2256                for (stack_ptr, _nibble) in branch.state_mask.iter().enumerate() {
2257                    children
2258                        .push(ArenaSparseNodeBranchChild::Blinded(branch.stack[stack_ptr].clone()));
2259                }
2260
2261                self.upper_arena[self.root] = ArenaSparseNode::Branch(ArenaSparseNodeBranch {
2262                    state: ArenaSparseNodeState::Revealed,
2263                    children,
2264                    state_mask: branch.state_mask,
2265                    short_key: branch.key,
2266                    branch_masks: masks.unwrap_or_default(),
2267                });
2268            }
2269            TrieNodeV2::Extension(_) => {
2270                panic!("set_root does not support Extension nodes; extensions are represented as branches with a short_key")
2271            }
2272        }
2273
2274        Ok(())
2275    }
2276
2277    fn set_updates(&mut self, retain_updates: bool) {
2278        self.updates = retain_updates.then(Default::default);
2279        if retain_updates {
2280            self.buffers.updates.get_or_insert_with(SparseTrieUpdates::default).clear();
2281        } else {
2282            self.buffers.updates = None;
2283        }
2284    }
2285
2286    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, fields(num_nodes = nodes.len()))]
2287    fn reveal_nodes(&mut self, nodes: &mut [ProofTrieNodeV2]) -> SparseTrieResult<()> {
2288        if nodes.is_empty() {
2289            return Ok(());
2290        }
2291
2292        #[cfg(feature = "trie-debug")]
2293        self.debug_recorder.record(RecordedOp::RevealNodes {
2294            nodes: nodes.iter().map(ProofTrieNodeRecord::from_proof_trie_node_v2).collect(),
2295        });
2296
2297        if matches!(self.upper_arena[self.root], ArenaSparseNode::EmptyRoot) {
2298            trace!(target: TRACE_TARGET, "Skipping reveal_nodes on empty root");
2299            return Ok(());
2300        }
2301
2302        // Sort nodes lexicographically by path.
2303        nodes.sort_unstable_by_key(|n| n.path);
2304
2305        let threshold = self.parallelism_thresholds.min_revealed_nodes;
2306
2307        // Take the cursor out to avoid borrow conflicts with `self`.
2308        let mut cursor = mem::take(&mut self.buffers.cursor);
2309        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2310
2311        // Skip root node if present (set_root handles the root).
2312        let mut node_idx = if nodes[0].path.is_empty() { 1 } else { 0 };
2313
2314        // Walk the upper trie, revealing upper nodes inline and collecting subtrie work.
2315        // Subtries with enough nodes to reveal are taken for parallel processing; the rest
2316        // are revealed inline.
2317        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>, Vec<ProofTrieNodeV2>)> = Vec::new();
2318
2319        while node_idx < nodes.len() {
2320            let find_result = cursor.seek(&mut self.upper_arena, &nodes[node_idx].path);
2321
2322            match find_result {
2323                SeekResult::RevealedLeaf => {
2324                    trace!(target: TRACE_TARGET, path = ?nodes[node_idx].path, "Skipping reveal: leaf head");
2325                    node_idx += 1;
2326                }
2327                SeekResult::Blinded => {
2328                    // Save the proof node's path before reveal_node consumes it.
2329                    let child_path = nodes[node_idx].path;
2330                    let child_idx = Self::reveal_node(
2331                        &mut self.upper_arena,
2332                        &cursor,
2333                        &mut nodes[node_idx],
2334                        SeekResult::Blinded,
2335                    );
2336                    node_idx += 1;
2337
2338                    if let Some(child_idx) = child_idx {
2339                        self.maybe_wrap_in_subtrie(child_idx, &child_path);
2340                    }
2341                }
2342                SeekResult::RevealedSubtrie => {
2343                    let subtrie_entry = cursor.head().expect("cursor is non-empty");
2344                    let child_idx = subtrie_entry.index;
2345                    let prefix = subtrie_entry.path;
2346
2347                    let subtrie_start = node_idx;
2348                    while node_idx < nodes.len() && nodes[node_idx].path.starts_with(&prefix) {
2349                        node_idx += 1;
2350                    }
2351                    let num_subtrie_nodes = node_idx - subtrie_start;
2352
2353                    if num_subtrie_nodes >= threshold {
2354                        // Take subtrie for parallel reveal.
2355                        trace!(target: TRACE_TARGET, ?prefix, num_subtrie_nodes, "Taking subtrie for parallel reveal");
2356                        let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
2357                            &mut self.upper_arena[child_idx],
2358                            ArenaSparseNode::TakenSubtrie,
2359                        ) else {
2360                            unreachable!("RevealedSubtrie must point to a Subtrie node")
2361                        };
2362                        let node_vec: Vec<ProofTrieNodeV2> = (subtrie_start..node_idx)
2363                            .map(|i| mem::replace(&mut nodes[i], ProofTrieNodeV2::empty()))
2364                            .collect();
2365                        taken.push((child_idx, subtrie, node_vec));
2366                    } else {
2367                        // Reveal inline.
2368                        trace!(target: TRACE_TARGET, ?prefix, num_subtrie_nodes, "Revealing subtrie inline");
2369                        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[child_idx]
2370                        else {
2371                            unreachable!("RevealedSubtrie must point to a Subtrie node")
2372                        };
2373                        let mut subtrie_nodes: Vec<ProofTrieNodeV2> = (subtrie_start..node_idx)
2374                            .map(|i| mem::replace(&mut nodes[i], ProofTrieNodeV2::empty()))
2375                            .collect();
2376                        subtrie.reveal_nodes(&mut subtrie_nodes)?;
2377                    }
2378                }
2379                _ => {
2380                    trace!(target: TRACE_TARGET, path = ?nodes[node_idx].path, ?find_result, "Skipping reveal: no blinded child");
2381                    node_idx += 1;
2382                }
2383            }
2384        }
2385
2386        // Drain remaining cursor entries from the upper-trie walk.
2387        cursor.drain(&mut self.upper_arena);
2388        self.buffers.cursor = cursor;
2389
2390        if taken.is_empty() {
2391            return Ok(());
2392        }
2393
2394        // Reveal taken subtries, in parallel if more than one.
2395        if taken.len() == 1 {
2396            let (_, subtrie, node_vec) = &mut taken[0];
2397            subtrie.reveal_nodes(node_vec)?;
2398        } else {
2399            use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
2400
2401            let results: Vec<SparseTrieResult<()>> = taken
2402                .par_iter_mut()
2403                .map(|(_, subtrie, node_vec)| subtrie.reveal_nodes(node_vec))
2404                .collect();
2405
2406            if let Some(err) = results.into_iter().find(|r| r.is_err()) {
2407                // Restore before returning so we don't leave TakenSubtrie holes.
2408                for (idx, subtrie, _) in taken {
2409                    self.upper_arena[idx] = ArenaSparseNode::Subtrie(subtrie);
2410                }
2411                return err;
2412            }
2413        }
2414
2415        // Restore taken subtries into the upper arena.
2416        for (idx, subtrie, _) in taken {
2417            self.upper_arena[idx] = ArenaSparseNode::Subtrie(subtrie);
2418        }
2419
2420        #[cfg(debug_assertions)]
2421        self.debug_assert_subtrie_structure();
2422
2423        Ok(())
2424    }
2425
2426    #[instrument(level = "trace", target = TRACE_TARGET, skip_all, ret)]
2427    fn root(&mut self) -> B256 {
2428        #[cfg(feature = "trie-debug")]
2429        self.debug_recorder.record(RecordedOp::Root);
2430
2431        self.update_subtrie_hashes();
2432
2433        // Merge buffered subtrie updates into self.updates before hashing the upper trie,
2434        // which will add its own updates directly into self.updates.
2435        Self::merge_subtrie_updates(&mut self.updates, &mut self.buffers.updates);
2436
2437        let rlp_node = Self::update_cached_rlp(
2438            &mut self.upper_arena,
2439            self.root,
2440            &mut self.buffers.cursor,
2441            &mut self.buffers.rlp_buf,
2442            &mut self.buffers.rlp_node_buf,
2443            Nibbles::default(),
2444            &mut self.updates,
2445        );
2446
2447        rlp_node.as_hash().expect("root RlpNode must be a hash")
2448    }
2449
2450    fn is_root_cached(&self) -> bool {
2451        self.upper_arena[self.root].is_cached()
2452    }
2453
2454    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2455    fn update_subtrie_hashes(&mut self) {
2456        #[cfg(feature = "trie-debug")]
2457        self.debug_recorder.record(RecordedOp::UpdateSubtrieHashes);
2458
2459        trace!(target: TRACE_TARGET, "Updating subtrie hashes");
2460
2461        // Only descend if the root is a branch; otherwise there are no subtries.
2462        if !matches!(&self.upper_arena[self.root], ArenaSparseNode::Branch(_)) {
2463            return;
2464        }
2465
2466        // Count total dirty leaves across all subtries to make one global parallelism decision.
2467        let mut total_dirty_leaves: u64 = 0;
2468        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>)> = Vec::new();
2469        for (idx, node) in &mut self.upper_arena {
2470            let ArenaSparseNode::Subtrie(s) = node else { continue };
2471            if s.num_dirty_leaves == 0 {
2472                continue;
2473            }
2474            total_dirty_leaves += s.num_dirty_leaves;
2475            let ArenaSparseNode::Subtrie(subtrie) =
2476                mem::replace(node, ArenaSparseNode::TakenSubtrie)
2477            else {
2478                unreachable!()
2479            };
2480            taken.push((idx, subtrie));
2481        }
2482
2483        // Hash taken subtries in parallel if total dirty leaves meet the threshold.
2484        if !taken.is_empty() {
2485            if taken.len() == 1 || total_dirty_leaves < self.parallelism_thresholds.min_dirty_leaves
2486            {
2487                for (_, subtrie) in &mut taken {
2488                    subtrie.update_cached_rlp();
2489                }
2490            } else {
2491                use rayon::iter::{IntoParallelIterator, ParallelIterator};
2492
2493                taken = taken
2494                    .into_par_iter()
2495                    .map(|(idx, mut subtrie)| {
2496                        subtrie.update_cached_rlp();
2497                        (idx, subtrie)
2498                    })
2499                    .collect();
2500            }
2501        }
2502
2503        // If the root branch is already cached and nothing was taken for parallel
2504        // hashing, there are no dirty subtries to process.
2505        if taken.is_empty() && self.upper_arena[self.root].is_cached() {
2506            return;
2507        }
2508
2509        // Walk the upper trie depth-first, restoring hashed subtries and inline-hashing
2510        // any remaining dirty subtries. Only descend into dirty branches; clean subtrees
2511        // cannot contain dirty subtries since dirty state propagates upward.
2512        taken.sort_unstable_by_key(|(_, b)| Reverse(b.path));
2513
2514        self.buffers.cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2515
2516        loop {
2517            let result = self.buffers.cursor.next(&mut self.upper_arena, |_, child| match child {
2518                ArenaSparseNode::Branch(_) | ArenaSparseNode::Subtrie(_) => !child.is_cached(),
2519                ArenaSparseNode::TakenSubtrie => true,
2520                _ => false,
2521            });
2522
2523            match result {
2524                NextResult::Done => break,
2525                NextResult::Branch => continue,
2526                NextResult::NonBranch => {}
2527            }
2528
2529            // Head is a subtrie or taken-subtrie — process it.
2530            let head_idx = self.buffers.cursor.head().expect("cursor is non-empty").index;
2531
2532            if matches!(&self.upper_arena[head_idx], ArenaSparseNode::TakenSubtrie) {
2533                let (_, subtrie) = taken.pop().expect("taken subtries must not be exhausted");
2534                debug_assert_eq!(
2535                    subtrie.path,
2536                    self.buffers.cursor.head().expect("cursor is non-empty").path,
2537                    "taken subtrie path mismatch",
2538                );
2539                self.upper_arena[head_idx] = ArenaSparseNode::Subtrie(subtrie);
2540            }
2541
2542            self.update_upper_subtrie(head_idx);
2543        }
2544    }
2545
2546    fn get_leaf_value(&self, full_path: &Nibbles) -> Option<&Vec<u8>> {
2547        Self::get_leaf_value_in_arena(&self.upper_arena, self.root, full_path, 0)
2548    }
2549
2550    fn find_leaf(
2551        &self,
2552        full_path: &Nibbles,
2553        expected_value: Option<&Vec<u8>>,
2554    ) -> Result<LeafLookup, LeafLookupError> {
2555        Self::find_leaf_in_arena(&self.upper_arena, self.root, full_path, 0, expected_value)
2556    }
2557
2558    fn updates_ref(&self) -> Cow<'_, SparseTrieUpdates> {
2559        self.updates.as_ref().map_or(Cow::Owned(SparseTrieUpdates::default()), Cow::Borrowed)
2560    }
2561
2562    fn take_updates(&mut self) -> SparseTrieUpdates {
2563        match self.updates.take() {
2564            Some(updates) => {
2565                self.updates = Some(SparseTrieUpdates::with_capacity(
2566                    updates.updated_nodes.len(),
2567                    updates.removed_nodes.len(),
2568                ));
2569                updates
2570            }
2571            None => SparseTrieUpdates::default(),
2572        }
2573    }
2574
2575    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2576    fn wipe(&mut self) {
2577        trace!(target: TRACE_TARGET, "Wiping arena trie");
2578        self.clear();
2579        self.updates = self.updates.is_some().then(SparseTrieUpdates::wiped);
2580    }
2581
2582    #[instrument(level = "trace", target = TRACE_TARGET, skip_all)]
2583    fn clear(&mut self) {
2584        #[cfg(feature = "trie-debug")]
2585        self.debug_recorder.reset();
2586
2587        self.upper_arena = SlotMap::new();
2588        self.root = self.upper_arena.insert(ArenaSparseNode::EmptyRoot);
2589        if let Some(updates) = self.updates.as_mut() {
2590            updates.clear()
2591        }
2592        self.buffers.clear();
2593    }
2594
2595    fn size_hint(&self) -> usize {
2596        self.upper_arena
2597            .iter()
2598            .map(|(_, node)| match node {
2599                ArenaSparseNode::Subtrie(s) => s.num_leaves as usize,
2600                ArenaSparseNode::Leaf { .. } => 1,
2601                _ => 0,
2602            })
2603            .sum()
2604    }
2605
2606    fn memory_size(&self) -> usize {
2607        let slot_size = slotmap_slot_size::<ArenaSparseNode>();
2608
2609        // Upper arena: capacity × slot size.
2610        let upper = self.upper_arena.capacity() * slot_size;
2611
2612        // Active subtries: cached total memory from last prune.
2613        let subtrie_size: usize = self
2614            .upper_arena
2615            .iter()
2616            .filter_map(|(_, node)| match node {
2617                ArenaSparseNode::Subtrie(s) => Some(s.memory_size()),
2618                _ => None,
2619            })
2620            .sum();
2621
2622        // RLP buffers.
2623        let buffer_size = self.buffers.rlp_buf.capacity() +
2624            self.buffers.rlp_node_buf.capacity() * core::mem::size_of::<RlpNode>();
2625
2626        upper + subtrie_size + buffer_size
2627    }
2628
2629    #[instrument(
2630        level = "trace",
2631        target = TRACE_TARGET,
2632        skip_all,
2633        fields(num_retained_leaves = retained_leaves.len()),
2634    )]
2635    fn prune(&mut self, retained_leaves: &[Nibbles]) -> usize {
2636        // Only descend if the root is a branch; otherwise there are no subtries.
2637        if !matches!(&self.upper_arena[self.root], ArenaSparseNode::Branch(_)) {
2638            return 0;
2639        }
2640
2641        let mut retained_leaves = retained_leaves.to_vec();
2642        retained_leaves.sort_unstable();
2643
2644        let threshold = self.parallelism_thresholds.min_leaves_for_prune;
2645
2646        let mut cursor = mem::take(&mut self.buffers.cursor);
2647        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2648
2649        // Subtries taken for parallel pruning: (arena_index, subtrie, retained_range).
2650        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>, core::ops::Range<usize>)> = Vec::new();
2651
2652        let mut pruned = 0;
2653        let mut retained_idx = 0;
2654
2655        loop {
2656            let result = cursor.next(&mut self.upper_arena, |_, child| {
2657                matches!(
2658                    child,
2659                    ArenaSparseNode::Branch(_) |
2660                        ArenaSparseNode::Subtrie(_) |
2661                        ArenaSparseNode::Leaf { .. }
2662                )
2663            });
2664
2665            match result {
2666                NextResult::Done => break,
2667                NextResult::NonBranch | NextResult::Branch => {}
2668            }
2669
2670            let head = cursor.head().expect("cursor is non-empty");
2671            let head_idx = head.index;
2672            let head_path = head.path;
2673
2674            match &self.upper_arena[head_idx] {
2675                ArenaSparseNode::Branch(_) | ArenaSparseNode::Leaf { .. } => {
2676                    // Don't prune the root.
2677                    if cursor.depth() == 0 {
2678                        continue;
2679                    }
2680
2681                    let short_key =
2682                        self.upper_arena[head_idx].short_key().expect("must be branch or leaf");
2683                    let mut node_prefix = head_path;
2684                    node_prefix.extend(short_key);
2685
2686                    let range = prefix_range(&retained_leaves, 0, &node_prefix);
2687                    if !range.is_empty() {
2688                        continue;
2689                    }
2690
2691                    Self::remove_pruned_node(
2692                        &mut self.upper_arena,
2693                        &cursor,
2694                        head_idx,
2695                        head_path.last(),
2696                    );
2697                    pruned += 1;
2698                }
2699                ArenaSparseNode::Subtrie(_) => {
2700                    let subtrie_range = prefix_range(&retained_leaves, retained_idx, &head_path);
2701                    retained_idx = subtrie_range.end;
2702
2703                    if subtrie_range.is_empty() {
2704                        let removed = Self::remove_pruned_node(
2705                            &mut self.upper_arena,
2706                            &cursor,
2707                            head_idx,
2708                            head_path.last(),
2709                        );
2710                        let ArenaSparseNode::Subtrie(s) = &removed else { unreachable!() };
2711                        pruned += s.num_leaves as usize;
2712                        self.recycle_subtrie(removed);
2713                        continue;
2714                    }
2715
2716                    let ArenaSparseNode::Subtrie(subtrie) = &self.upper_arena[head_idx] else {
2717                        unreachable!()
2718                    };
2719                    if subtrie.num_leaves >= threshold {
2720                        let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
2721                            &mut self.upper_arena[head_idx],
2722                            ArenaSparseNode::TakenSubtrie,
2723                        ) else {
2724                            unreachable!()
2725                        };
2726                        taken.push((head_idx, subtrie, subtrie_range));
2727                    } else {
2728                        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[head_idx]
2729                        else {
2730                            unreachable!()
2731                        };
2732                        pruned += subtrie.prune(&retained_leaves[subtrie_range]);
2733                    }
2734                }
2735                _ => unreachable!("NonBranch in prune walk must be Subtrie, Leaf, or Branch"),
2736            }
2737        }
2738
2739        self.buffers.cursor = cursor;
2740
2741        if !taken.is_empty() {
2742            // Prune taken subtries, in parallel if more than one.
2743            if taken.len() == 1 {
2744                let (_, ref mut subtrie, ref range) = taken[0];
2745                pruned += subtrie.prune(&retained_leaves[range.clone()]);
2746            } else {
2747                use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
2748
2749                pruned += taken
2750                    .par_iter_mut()
2751                    .map(|(_, subtrie, range)| subtrie.prune(&retained_leaves[range.clone()]))
2752                    .sum::<usize>();
2753            }
2754
2755            // Restore taken subtries into the upper arena.
2756            for (child_idx, subtrie, _) in taken {
2757                self.upper_arena[child_idx] = ArenaSparseNode::Subtrie(subtrie);
2758            }
2759        }
2760
2761        if pruned > 0 {
2762            compact_arena(&mut self.upper_arena, &mut self.root);
2763        }
2764
2765        #[cfg(feature = "trie-debug")]
2766        self.record_initial_state();
2767
2768        pruned
2769    }
2770
2771    #[instrument(
2772        level = "trace",
2773        target = TRACE_TARGET,
2774        skip_all,
2775        fields(num_updates = updates.len()),
2776    )]
2777    fn update_leaves(
2778        &mut self,
2779        updates: &mut B256Map<LeafUpdate>,
2780        mut proof_required_fn: impl FnMut(B256, u8),
2781    ) -> SparseTrieResult<()> {
2782        if updates.is_empty() {
2783            return Ok(());
2784        }
2785
2786        #[cfg(feature = "trie-debug")]
2787        let recorded_updates: Vec<_> =
2788            updates.iter().map(|(k, v)| (*k, LeafUpdateRecord::from(v))).collect();
2789        #[cfg(feature = "trie-debug")]
2790        let mut recorded_proof_targets: Vec<(B256, u8)> = Vec::new();
2791
2792        // Drain and sort updates lexicographically by nibbles path.
2793        let mut sorted: Vec<_> =
2794            updates.drain().map(|(key, update)| (key, Nibbles::unpack(key), update)).collect();
2795        sorted.sort_unstable_by_key(|entry| entry.1);
2796
2797        let threshold = self.parallelism_thresholds.min_updates;
2798        let parallelize_distributed_updates = sorted.len() >= threshold.saturating_mul(4);
2799
2800        let mut cursor = mem::take(&mut self.buffers.cursor);
2801        cursor.reset(&self.upper_arena, self.root, Nibbles::default());
2802
2803        // Subtries taken for parallel processing: (arena_index, subtrie, update_range).
2804        let mut taken: Vec<(Index, Box<ArenaSparseSubtrie>, core::ops::Range<usize>)> = Vec::new();
2805
2806        let mut update_idx = 0;
2807        while update_idx < sorted.len() {
2808            let (key, ref full_path, ref update) = sorted[update_idx];
2809
2810            let find_result = cursor.seek(&mut self.upper_arena, full_path);
2811
2812            match find_result {
2813                // Blinded — request a proof regardless of update type.
2814                SeekResult::Blinded => {
2815                    let logical_len = cursor.head_logical_branch_path_len(&self.upper_arena);
2816                    let min_len = (logical_len as u8 + 1).min(64);
2817                    trace!(target: TRACE_TARGET, ?key, min_len, "Update hit blinded node, requesting proof");
2818                    proof_required_fn(key, min_len);
2819                    #[cfg(feature = "trie-debug")]
2820                    recorded_proof_targets.push((key, min_len));
2821                    updates.insert(key, update.clone());
2822                }
2823                // Subtrie — forward all consecutive updates under this subtrie's prefix.
2824                SeekResult::RevealedSubtrie => {
2825                    let subtrie_entry = cursor.head().expect("cursor is non-empty");
2826                    let child_idx = subtrie_entry.index;
2827                    let subtrie_root_path = subtrie_entry.path;
2828
2829                    let subtrie_start = update_idx;
2830                    while update_idx < sorted.len() &&
2831                        sorted[update_idx].1.starts_with(&subtrie_root_path)
2832                    {
2833                        update_idx += 1;
2834                    }
2835
2836                    let subtrie_updates = &sorted[subtrie_start..update_idx];
2837
2838                    // Edge-case: if all updates are removals that could empty the
2839                    // subtrie and collapse the parent onto a blinded sibling, request
2840                    // a proof for the sibling and skip the subtrie's updates.
2841                    if let Some(proof) = Self::check_subtrie_collapse_needs_proof(
2842                        &self.upper_arena,
2843                        &cursor,
2844                        subtrie_updates,
2845                    ) {
2846                        trace!(target: TRACE_TARGET, proof_key = ?proof.key, proof_min_len = proof.min_len, "Subtrie collapse would need blinded sibling, requesting proof");
2847                        proof_required_fn(proof.key, proof.min_len);
2848                        #[cfg(feature = "trie-debug")]
2849                        recorded_proof_targets.push((proof.key, proof.min_len));
2850                        for &(key, _, ref update) in subtrie_updates {
2851                            updates.insert(key, update.clone());
2852                        }
2853                        // Pop the subtrie entry before continuing.
2854                        continue;
2855                    }
2856
2857                    let num_subtrie_updates = update_idx - subtrie_start;
2858
2859                    // If all updates are removals and could empty the subtrie,
2860                    // force inline processing so the upper-arena collapse logic
2861                    // can detect blinded siblings and request proofs.
2862                    let all_removals = subtrie_updates
2863                        .iter()
2864                        // Filter out Touched, as they don't affect the structure of the trie. So an
2865                        // update set with 2 removals and one Touched could still result in an empty
2866                        // sub trie.
2867                        .filter(|(_, _, u)| matches!(u, LeafUpdate::Changed(_)))
2868                        .all(|(_, _, u)| matches!(u, LeafUpdate::Changed(v) if v.is_empty()));
2869                    let subtrie_num_leaves = match &self.upper_arena[child_idx] {
2870                        ArenaSparseNode::Subtrie(s) => s.num_leaves,
2871                        _ => 0,
2872                    };
2873                    let might_empty_subtrie =
2874                        all_removals && num_subtrie_updates as u64 >= subtrie_num_leaves;
2875
2876                    if (num_subtrie_updates >= threshold || parallelize_distributed_updates) &&
2877                        !might_empty_subtrie
2878                    {
2879                        // Take subtrie for parallel update.
2880                        trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Taking subtrie for parallel update");
2881                        let ArenaSparseNode::Subtrie(subtrie) = mem::replace(
2882                            &mut self.upper_arena[child_idx],
2883                            ArenaSparseNode::TakenSubtrie,
2884                        ) else {
2885                            unreachable!()
2886                        };
2887                        taken.push((child_idx, subtrie, subtrie_start..update_idx));
2888                    } else {
2889                        // Update inline.
2890                        trace!(target: TRACE_TARGET, ?subtrie_root_path, num_subtrie_updates, "Updating subtrie inline");
2891                        let ArenaSparseNode::Subtrie(subtrie) = &mut self.upper_arena[child_idx]
2892                        else {
2893                            unreachable!()
2894                        };
2895
2896                        subtrie.update_leaves(subtrie_updates);
2897
2898                        for (target_idx, proof) in subtrie.required_proofs.drain(..) {
2899                            proof_required_fn(proof.key, proof.min_len);
2900                            #[cfg(feature = "trie-debug")]
2901                            recorded_proof_targets.push((proof.key, proof.min_len));
2902                            let (key, _, ref update) = subtrie_updates[target_idx];
2903                            updates.insert(key, update.clone());
2904                        }
2905
2906                        // Check if the subtrie's root became empty after updates.
2907                        self.maybe_unwrap_subtrie(&mut cursor);
2908                    }
2909
2910                    // Don't increment update_idx — already advanced past subtrie updates.
2911                    continue;
2912                }
2913                // EmptyRoot, leaf, diverged branch, or empty child slot — upsert directly.
2914                find_result @ (SeekResult::EmptyRoot |
2915                SeekResult::RevealedLeaf |
2916                SeekResult::Diverged |
2917                SeekResult::NoChild { .. }) => match update {
2918                    LeafUpdate::Changed(v) if !v.is_empty() => {
2919                        let (result, _deltas) = Self::upsert_leaf(
2920                            &mut self.upper_arena,
2921                            &mut cursor,
2922                            &mut self.root,
2923                            full_path,
2924                            v,
2925                            find_result,
2926                        );
2927                        match result {
2928                            UpsertLeafResult::NewChild => {
2929                                let head = cursor.head().expect("cursor is non-empty");
2930                                if Self::should_be_subtrie(head.path.len()) {
2931                                    // The new child itself sits at the subtrie
2932                                    // boundary — wrap it directly.
2933                                    self.maybe_wrap_in_subtrie(head.index, &head.path);
2934                                } else {
2935                                    // The new child is above the boundary (e.g. a
2936                                    // split at depth 1 creates children at depth 2).
2937                                    // Wrap any of its children that land there.
2938                                    self.maybe_wrap_branch_children(&cursor);
2939                                }
2940                            }
2941                            UpsertLeafResult::NewLeaf => {
2942                                // A root-level split may create children at the
2943                                // subtrie boundary depth. Wrap them.
2944                                self.maybe_wrap_branch_children(&cursor);
2945                            }
2946                            UpsertLeafResult::Updated => {}
2947                        }
2948                    }
2949                    LeafUpdate::Changed(_) => {
2950                        let (result, _deltas) = Self::remove_leaf(
2951                            &mut self.upper_arena,
2952                            &mut cursor,
2953                            &mut self.root,
2954                            key,
2955                            full_path,
2956                            find_result,
2957                            &mut self.buffers.updates,
2958                        );
2959                        match result {
2960                            RemoveLeafResult::NeedsProof { key, proof_key, min_len } => {
2961                                proof_required_fn(proof_key, min_len);
2962                                #[cfg(feature = "trie-debug")]
2963                                recorded_proof_targets.push((proof_key, min_len));
2964                                let update =
2965                                    mem::replace(&mut sorted[update_idx].2, LeafUpdate::Touched);
2966                                updates.insert(key, update);
2967                            }
2968                            RemoveLeafResult::Removed => {
2969                                // remove_leaf may have called collapse_branch, which
2970                                // can leave structural invariants violated:
2971                                // 1. A branch with 0-1 children that needs further collapse or
2972                                //    removal.
2973                                // 2. A Subtrie at a depth shallower than UPPER_TRIE_MAX_DEPTH that
2974                                //    needs unwrapping.
2975                                // 3. A non-Subtrie node at UPPER_TRIE_MAX_DEPTH that needs
2976                                //    wrapping.
2977                                self.maybe_collapse_or_remove_branch(&mut cursor);
2978                                let head =
2979                                    cursor.head().expect("cursor always has root after collapse");
2980                                self.maybe_wrap_in_subtrie(head.index, &head.path);
2981                            }
2982                            RemoveLeafResult::NotFound => {}
2983                        }
2984                    }
2985                    LeafUpdate::Touched => {}
2986                },
2987            }
2988
2989            update_idx += 1;
2990        }
2991
2992        // Drain remaining cursor entries from the upper-trie walk.
2993        cursor.drain(&mut self.upper_arena);
2994        self.buffers.cursor = cursor;
2995
2996        if taken.is_empty() {
2997            #[cfg(debug_assertions)]
2998            self.debug_assert_subtrie_structure();
2999
3000            #[cfg(feature = "trie-debug")]
3001            self.debug_recorder.record(RecordedOp::UpdateLeaves {
3002                updates: recorded_updates,
3003                proof_targets: recorded_proof_targets,
3004            });
3005
3006            return Ok(());
3007        }
3008
3009        // Apply updates to taken subtries, in parallel if more than one.
3010        if taken.len() == 1 {
3011            let (_, ref mut subtrie, ref range) = taken[0];
3012            subtrie.update_leaves(&sorted[range.clone()]);
3013        } else {
3014            use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
3015
3016            taken.par_iter_mut().for_each(|(_, subtrie, range)| {
3017                subtrie.update_leaves(&sorted[range.clone()]);
3018            });
3019        }
3020
3021        // Collect subtrie paths before consuming `taken`, then restore subtries and
3022        // process required proofs.
3023        let taken_paths: Vec<Nibbles> = taken.iter().map(|(_, s, _)| s.path).collect();
3024        for (child_idx, mut subtrie, range) in taken {
3025            let subtrie_updates = &sorted[range];
3026            for (target_idx, proof) in subtrie.required_proofs.drain(..) {
3027                proof_required_fn(proof.key, proof.min_len);
3028                #[cfg(feature = "trie-debug")]
3029                recorded_proof_targets.push((proof.key, proof.min_len));
3030                let (key, _, ref update) = subtrie_updates[target_idx];
3031                updates.insert(key, update.clone());
3032            }
3033
3034            // Restore the subtrie into the upper arena.
3035            self.upper_arena[child_idx] = ArenaSparseNode::Subtrie(subtrie);
3036        }
3037
3038        // Navigate to each taken subtrie via seek to propagate dirty state
3039        // through intermediate branches. Taken subtries are guaranteed not to
3040        // become EmptyRoot (the would-empty-subtrie check above forces those
3041        // inline), so we only need to handle sibling collapses that may have
3042        // occurred during inline processing while this subtrie was taken.
3043        {
3044            let mut cursor = mem::take(&mut self.buffers.cursor);
3045            cursor.reset(&self.upper_arena, self.root, Nibbles::default());
3046
3047            for path in &taken_paths {
3048                let find_result = cursor.seek(&mut self.upper_arena, path);
3049                match find_result {
3050                    SeekResult::RevealedSubtrie => {
3051                        debug_assert!(
3052                            {
3053                                let head_idx = cursor.head().expect("cursor is non-empty").index;
3054                                !matches!(
3055                                    &self.upper_arena[head_idx],
3056                                    ArenaSparseNode::Subtrie(s) if matches!(s.arena[s.root], ArenaSparseNode::EmptyRoot)
3057                                )
3058                            },
3059                            "taken subtrie became EmptyRoot — should have been forced inline"
3060                        );
3061
3062                        cursor.pop(&mut self.upper_arena);
3063
3064                        // The parent branch (now at cursor top) may have had a sibling
3065                        // removed during inline processing while this subtrie was taken.
3066                        // Handle any necessary collapse or removal.
3067                        self.maybe_collapse_or_remove_branch(&mut cursor);
3068                    }
3069                    _ => {
3070                        // Subtrie was already unwrapped by a prior collapse; dirty state
3071                        // was propagated during that collapse. Nothing to do.
3072                    }
3073                }
3074            }
3075
3076            cursor.drain(&mut self.upper_arena);
3077            self.buffers.cursor = cursor;
3078        }
3079
3080        #[cfg(debug_assertions)]
3081        self.debug_assert_subtrie_structure();
3082
3083        #[cfg(feature = "trie-debug")]
3084        self.debug_recorder.record(RecordedOp::UpdateLeaves {
3085            updates: recorded_updates,
3086            proof_targets: recorded_proof_targets,
3087        });
3088
3089        Ok(())
3090    }
3091
3092    #[cfg(feature = "trie-debug")]
3093    fn take_debug_recorder(&mut self) -> TrieDebugRecorder {
3094        core::mem::take(&mut self.debug_recorder)
3095    }
3096}
3097
3098#[cfg(test)]
3099mod tests {
3100    use super::TRACE_TARGET;
3101    use crate::{ArenaParallelSparseTrie, ArenaParallelismThresholds, LeafUpdate, SparseTrie};
3102    use alloy_primitives::{map::B256Map, B256, U256};
3103    use rand::{seq::SliceRandom, Rng, SeedableRng};
3104    use reth_trie::test_utils::TrieTestHarness;
3105    use reth_trie_common::{Nibbles, ProofV2Target};
3106    use std::collections::BTreeMap;
3107    use tracing::{info, trace};
3108
3109    /// Test harness for proptest-based arena sparse trie testing.
3110    ///
3111    /// Wraps [`TrieTestHarness`] and adds `ArenaParallelSparseTrie`-specific helpers for
3112    /// the reveal-update loop and asserting that sparse trie updates match `StorageRoot`.
3113    struct ArenaTrieTestHarness {
3114        /// The inner general-purpose harness.
3115        inner: TrieTestHarness,
3116    }
3117
3118    impl std::ops::Deref for ArenaTrieTestHarness {
3119        type Target = TrieTestHarness;
3120        fn deref(&self) -> &Self::Target {
3121            &self.inner
3122        }
3123    }
3124
3125    impl std::ops::DerefMut for ArenaTrieTestHarness {
3126        fn deref_mut(&mut self) -> &mut Self::Target {
3127            &mut self.inner
3128        }
3129    }
3130
3131    impl ArenaTrieTestHarness {
3132        /// Creates a new test harness from a map of hashed storage slots to values.
3133        fn new(storage: BTreeMap<B256, U256>) -> Self {
3134            Self { inner: TrieTestHarness::new(storage) }
3135        }
3136
3137        /// Computes the new storage root and trie updates after applying the given changes
3138        /// using both `StorageRoot` and the provided `ArenaParallelSparseTrie`, then asserts
3139        /// they match.
3140        fn assert_changes(
3141            &self,
3142            apst: &mut ArenaParallelSparseTrie,
3143            changes: BTreeMap<B256, U256>,
3144        ) {
3145            // Compute expected root and trie updates via StorageRoot.
3146            let (expected_root, mut expected_trie_updates) = if changes.is_empty() {
3147                (self.original_root(), Default::default())
3148            } else {
3149                self.get_root_with_updates(&changes)
3150            };
3151
3152            self.minimize_trie_updates(&mut expected_trie_updates);
3153
3154            // Build leaf updates for the APST: non-zero values are upserts (RLP-encoded),
3155            // zero values are deletions (empty vec).
3156            let mut leaf_updates: B256Map<LeafUpdate> = changes
3157                .iter()
3158                .map(|(&slot, &value)| {
3159                    let rlp_value = if value == U256::ZERO {
3160                        Vec::new()
3161                    } else {
3162                        alloy_rlp::encode_fixed_size(&value).to_vec()
3163                    };
3164                    (slot, LeafUpdate::Changed(rlp_value))
3165                })
3166                .collect();
3167
3168            // Reveal-update loop: call update_leaves, collect required proofs, fetch them,
3169            // reveal, and repeat until no more proofs are needed.
3170            loop {
3171                let mut targets: Vec<ProofV2Target> = Vec::new();
3172                apst.update_leaves(&mut leaf_updates, |key, min_len| {
3173                    targets.push(ProofV2Target::new(key).with_min_len(min_len));
3174                })
3175                .expect("update_leaves should succeed");
3176
3177                if targets.is_empty() {
3178                    break;
3179                }
3180
3181                let (mut proof_nodes, _) = self.proof_v2(&mut targets);
3182                apst.reveal_nodes(&mut proof_nodes).expect("reveal_nodes should succeed");
3183            }
3184
3185            // Compute root and take updates from the APST.
3186            let actual_root = apst.root();
3187            let mut actual_updates = apst.take_updates();
3188
3189            // Minimize sparse updates inline (can't use TrieTestHarness::minimize_sparse_updates
3190            // due to the crate's SparseTrieUpdates being a different type than reth-trie's copy).
3191            actual_updates.updated_nodes.retain(|path, node| {
3192                self.storage_trie_updates().storage_nodes.get(path) != Some(node)
3193            });
3194            actual_updates
3195                .removed_nodes
3196                .retain(|path| self.storage_trie_updates().storage_nodes.contains_key(path));
3197
3198            pretty_assertions::assert_eq!(
3199                expected_trie_updates.storage_nodes.into_iter().collect::<Vec<_>>().sort(),
3200                actual_updates.updated_nodes.into_iter().collect::<Vec<_>>().sort(),
3201                "updated nodes mismatch"
3202            );
3203            pretty_assertions::assert_eq!(
3204                expected_trie_updates.removed_nodes.into_iter().collect::<Vec<_>>().sort(),
3205                actual_updates.removed_nodes.into_iter().collect::<Vec<_>>().sort(),
3206                "removed nodes mismatch"
3207            );
3208            assert_eq!(expected_root, actual_root, "storage root mismatch");
3209        }
3210    }
3211
3212    use proptest::prelude::*;
3213    use proptest_arbitrary_interop::arb;
3214
3215    /// Builds a changeset by mixing `new_keys` (fresh insertions) with a fraction of
3216    /// existing keys from `base` (updates/deletions).
3217    ///
3218    /// `overlap_pct` controls how many existing keys are included, and `delete_pct`
3219    /// controls how many of those become deletions (zero values). The remaining
3220    /// overlap keys get random non-zero values.
3221    fn build_changeset(
3222        base: &BTreeMap<B256, U256>,
3223        new_keys: BTreeMap<B256, U256>,
3224        overlap_pct: f64,
3225        delete_pct: f64,
3226        rng: &mut rand::rngs::StdRng,
3227    ) -> BTreeMap<B256, U256> {
3228        let num_overlap = (base.len() as f64 * overlap_pct) as usize;
3229        let num_delete = (num_overlap as f64 * delete_pct) as usize;
3230
3231        let mut all_keys: Vec<B256> = base.keys().copied().collect();
3232        all_keys.shuffle(rng);
3233        let overlap_keys = &all_keys[..num_overlap];
3234
3235        let mut changeset = new_keys;
3236        for (i, &key) in overlap_keys.iter().enumerate() {
3237            let value =
3238                if i < num_delete { U256::ZERO } else { U256::from(rng.random::<u64>() | 1) };
3239            changeset.entry(key).or_insert(value);
3240        }
3241        changeset
3242    }
3243
3244    proptest! {
3245        #![proptest_config(ProptestConfig::with_cases(1000))]
3246        #[test]
3247        fn arena_trie_proptest(
3248            initial in proptest::collection::btree_map(arb::<B256>(), arb::<U256>(), 0..=100usize),
3249            changeset1_new_keys in proptest::collection::btree_map(arb::<B256>(), arb::<U256>(), 0..=30usize),
3250            changeset2_new_keys in proptest::collection::btree_map(arb::<B256>(), arb::<U256>(), 0..=30usize),
3251            overlap_pct in 0.0..=0.5f64,
3252            delete_pct in 0.0..=0.33f64, // percent of overlapping changeset which are deletes
3253            shuffle_seed in arb::<u64>(),
3254        ) {
3255            reth_tracing::init_test_tracing();
3256            info!(target: TRACE_TARGET, ?shuffle_seed, "PROPTEST START");
3257
3258            // Filter out zero-valued entries from the initial dataset (zeros mean "absent").
3259            let initial: BTreeMap<B256, U256> = initial.into_iter()
3260                .filter(|(_, v)| *v != U256::ZERO)
3261                .collect();
3262
3263            let mut rng = rand::rngs::StdRng::seed_from_u64(shuffle_seed);
3264
3265            let changeset1 = build_changeset(&initial, changeset1_new_keys, overlap_pct, delete_pct, &mut rng);
3266            for (i, (k, v)) in changeset1.iter().enumerate() {
3267                trace!(target: TRACE_TARGET, ?i, ?k, ?v, "Changeset 1 entry");
3268            }
3269
3270            let mut harness = ArenaTrieTestHarness::new(initial);
3271
3272            // Initialize the APST from the harness root node.
3273            let root_node = harness.root_node();
3274            let mut apst = ArenaParallelSparseTrie::default().with_parallelism_thresholds(
3275                ArenaParallelismThresholds {
3276                    min_dirty_leaves: 3,
3277                    min_revealed_nodes: 3,
3278                    min_updates: 3,
3279                    min_leaves_for_prune: 3,
3280                },
3281            );
3282            apst.set_root(root_node.node, root_node.masks, true).expect("set_root should succeed");
3283
3284            harness.assert_changes(&mut apst, changeset1.clone());
3285
3286            // Update the harness base dataset to reflect the first changeset.
3287            harness.apply_changeset(changeset1);
3288
3289            // Pick N random keys from the current storage as retained leaves for pruning.
3290            let mut all_storage_keys: Vec<Nibbles> = harness.storage().keys()
3291                .map(|k| Nibbles::unpack(*k))
3292                .collect();
3293            all_storage_keys.shuffle(&mut rng);
3294            let num_retain = if all_storage_keys.is_empty() { 0 } else {
3295                rng.random_range(0..=all_storage_keys.len())
3296            };
3297            let retained: Vec<Nibbles> = all_storage_keys[..num_retain].to_vec();
3298            apst.prune(&retained);
3299
3300            let changeset2 = build_changeset(harness.storage(), changeset2_new_keys, overlap_pct, delete_pct, &mut rng);
3301            for (i, (k, v)) in changeset2.iter().enumerate() {
3302                trace!(target: TRACE_TARGET, ?i, ?k, ?v, "Changeset 2 entry");
3303            }
3304
3305            harness.assert_changes(&mut apst, changeset2);
3306        }
3307    }
3308}