Skip to main content

reth_chain_state/
deferred_trie.rs

1use alloy_primitives::B256;
2use parking_lot::Mutex;
3use reth_metrics::{metrics::Counter, Metrics};
4use reth_trie::{
5    updates::{TrieUpdates, TrieUpdatesSorted},
6    HashedPostState, HashedPostStateSorted, TrieInputSorted,
7};
8use std::{
9    fmt,
10    sync::{Arc, LazyLock},
11};
12use tracing::{debug_span, instrument};
13
14/// Shared handle to asynchronously populated trie data.
15///
16/// Uses a try-lock + fallback computation approach for deadlock-free access.
17/// If the deferred task hasn't completed, computes trie data synchronously
18/// from stored unsorted inputs rather than blocking.
19#[derive(Clone)]
20pub struct DeferredTrieData {
21    /// Shared deferred state holding either raw inputs (pending) or computed result (ready).
22    state: Arc<Mutex<DeferredState>>,
23}
24
25/// Sorted trie data computed for an executed block.
26/// These represent the complete set of sorted trie data required to persist
27/// block state for, and generate proofs on top of, a block.
28#[derive(Clone, Debug, Default)]
29pub struct ComputedTrieData {
30    /// Sorted hashed post-state produced by execution.
31    pub hashed_state: Arc<HashedPostStateSorted>,
32    /// Sorted trie updates produced by state root computation.
33    pub trie_updates: Arc<TrieUpdatesSorted>,
34    /// Trie input bundled with its anchor hash, if available.
35    pub anchored_trie_input: Option<AnchoredTrieInput>,
36}
37
38/// Trie input bundled with its anchor hash.
39///
40/// The `trie_input` contains the **cumulative** overlay of all in-memory ancestor blocks,
41/// not just this block's changes. Child blocks reuse the parent's overlay in O(1) by
42/// cloning the Arc-wrapped data.
43///
44/// The `anchor_hash` is metadata indicating which persisted base state this overlay
45/// sits on top of. It is CRITICAL for overlay reuse decisions: an overlay built on top
46/// of Anchor A cannot be reused for a block anchored to Anchor B, as it would result
47/// in an incorrect state.
48#[derive(Clone, Debug)]
49pub struct AnchoredTrieInput {
50    /// The persisted ancestor hash this trie input is anchored to.
51    pub anchor_hash: B256,
52    /// Cumulative trie input overlay from all in-memory ancestors.
53    pub trie_input: Arc<TrieInputSorted>,
54}
55
56/// Metrics for deferred trie computation.
57#[derive(Metrics)]
58#[metrics(scope = "sync.block_validation")]
59struct DeferredTrieMetrics {
60    /// Number of times deferred trie data was ready (async task completed first).
61    deferred_trie_async_ready: Counter,
62    /// Number of times deferred trie data required synchronous computation (fallback path).
63    deferred_trie_sync_fallback: Counter,
64}
65
66static DEFERRED_TRIE_METRICS: LazyLock<DeferredTrieMetrics> =
67    LazyLock::new(DeferredTrieMetrics::default);
68
69/// Internal state for deferred trie data.
70enum DeferredState {
71    /// Data is not yet available; raw inputs stored for fallback computation.
72    /// Wrapped in `Option` to allow taking ownership during computation.
73    Pending(Option<PendingInputs>),
74    /// Data has been computed and is ready.
75    Ready(ComputedTrieData),
76}
77
78/// Inputs kept while a deferred trie computation is pending.
79#[derive(Clone, Debug)]
80struct PendingInputs {
81    /// Unsorted hashed post-state from execution.
82    hashed_state: Arc<HashedPostState>,
83    /// Unsorted trie updates from state root computation.
84    trie_updates: Arc<TrieUpdates>,
85    /// The persisted ancestor hash this trie input is anchored to.
86    anchor_hash: B256,
87    /// Deferred trie data from ancestor blocks for merging.
88    ancestors: Vec<DeferredTrieData>,
89}
90
91impl fmt::Debug for DeferredTrieData {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        let state = self.state.lock();
94        match &*state {
95            DeferredState::Pending(_) => {
96                f.debug_struct("DeferredTrieData").field("state", &"pending").finish()
97            }
98            DeferredState::Ready(_) => {
99                f.debug_struct("DeferredTrieData").field("state", &"ready").finish()
100            }
101        }
102    }
103}
104
105impl DeferredTrieData {
106    /// Create a new pending handle with fallback inputs for synchronous computation.
107    ///
108    /// If the async task hasn't completed when `wait_cloned` is called, the trie data
109    /// will be computed synchronously from these inputs. This eliminates deadlock risk.
110    ///
111    /// # Arguments
112    /// * `hashed_state` - Unsorted hashed post-state from execution
113    /// * `trie_updates` - Unsorted trie updates from state root computation
114    /// * `anchor_hash` - The persisted ancestor hash this trie input is anchored to
115    /// * `ancestors` - Deferred trie data from ancestor blocks for merging
116    pub fn pending(
117        hashed_state: Arc<HashedPostState>,
118        trie_updates: Arc<TrieUpdates>,
119        anchor_hash: B256,
120        ancestors: Vec<Self>,
121    ) -> Self {
122        Self {
123            state: Arc::new(Mutex::new(DeferredState::Pending(Some(PendingInputs {
124                hashed_state,
125                trie_updates,
126                anchor_hash,
127                ancestors,
128            })))),
129        }
130    }
131
132    /// Create a handle that is already populated with the given [`ComputedTrieData`].
133    ///
134    /// Useful when trie data is available immediately.
135    /// [`Self::wait_cloned`] will return without any computation.
136    pub fn ready(bundle: ComputedTrieData) -> Self {
137        Self { state: Arc::new(Mutex::new(DeferredState::Ready(bundle))) }
138    }
139
140    /// Sort block execution outputs and build a [`TrieInputSorted`] overlay.
141    ///
142    /// The trie input overlay accumulates sorted hashed state (account/storage changes) and
143    /// trie node updates from all in-memory ancestor blocks. This overlay is required for:
144    /// - Computing state roots on top of in-memory blocks
145    /// - Generating storage/account proofs for unpersisted state
146    ///
147    /// # Process
148    /// 1. Sort the current block's hashed state and trie updates
149    /// 2. Reuse parent's cached overlay if available (O(1) - the common case)
150    /// 3. Otherwise, rebuild overlay from ancestors (rare fallback)
151    /// 4. Extend the overlay with this block's sorted data
152    ///
153    /// Used by both the async background task and the synchronous fallback path.
154    ///
155    /// # Arguments
156    /// * `hashed_state` - Unsorted hashed post-state (account/storage changes) from execution
157    /// * `trie_updates` - Unsorted trie node updates from state root computation
158    /// * `anchor_hash` - The persisted ancestor hash this trie input is anchored to
159    /// * `ancestors` - Deferred trie data from ancestor blocks for merging (oldest -> newest)
160    pub fn sort_and_build_trie_input(
161        hashed_state: Arc<HashedPostState>,
162        trie_updates: Arc<TrieUpdates>,
163        anchor_hash: B256,
164        ancestors: &[Self],
165    ) -> ComputedTrieData {
166        let _span = debug_span!(target: "engine::tree::deferred_trie", "sort_inputs").entered();
167
168        #[cfg(feature = "rayon")]
169        let (sorted_hashed_state, sorted_trie_updates) = rayon::join(
170            || match Arc::try_unwrap(hashed_state) {
171                Ok(state) => state.into_sorted(),
172                Err(arc) => arc.clone_into_sorted(),
173            },
174            || match Arc::try_unwrap(trie_updates) {
175                Ok(updates) => updates.into_sorted(),
176                Err(arc) => arc.clone_into_sorted(),
177            },
178        );
179
180        #[cfg(not(feature = "rayon"))]
181        let (sorted_hashed_state, sorted_trie_updates) = (
182            match Arc::try_unwrap(hashed_state) {
183                Ok(state) => state.into_sorted(),
184                Err(arc) => arc.clone_into_sorted(),
185            },
186            match Arc::try_unwrap(trie_updates) {
187                Ok(updates) => updates.into_sorted(),
188                Err(arc) => arc.clone_into_sorted(),
189            },
190        );
191
192        drop(_span);
193
194        let _span = debug_span!(target: "engine::tree::deferred_trie", "build_overlay").entered();
195
196        // Reuse parent's overlay if available and anchors match.
197        // We can only reuse the parent's overlay if it was built on top of the same
198        // persisted anchor. If the anchor has changed (e.g., due to persistence),
199        // the parent's overlay is relative to an old state and cannot be used.
200        let overlay = if let Some(parent) = ancestors.last() {
201            let parent_data = parent.wait_cloned();
202
203            match &parent_data.anchored_trie_input {
204                // Case 1: Parent has cached overlay AND anchors match.
205                Some(AnchoredTrieInput { anchor_hash: parent_anchor, trie_input })
206                    if *parent_anchor == anchor_hash =>
207                {
208                    // O(1): Reuse parent's overlay, extend with current block's data.
209                    let mut overlay = TrieInputSorted::new(
210                        Arc::clone(&trie_input.nodes),
211                        Arc::clone(&trie_input.state),
212                        Default::default(), // prefix_sets are per-block, not cumulative
213                    );
214                    let _span =
215                        debug_span!(target: "engine::tree::deferred_trie", "extend_overlay")
216                            .entered();
217                    // Only trigger COW clone if there's actually data to add.
218                    #[cfg(feature = "rayon")]
219                    {
220                        rayon::join(
221                            || {
222                                if !sorted_hashed_state.is_empty() {
223                                    Arc::make_mut(&mut overlay.state)
224                                        .extend_ref_and_sort(&sorted_hashed_state);
225                                }
226                            },
227                            || {
228                                if !sorted_trie_updates.is_empty() {
229                                    Arc::make_mut(&mut overlay.nodes)
230                                        .extend_ref_and_sort(&sorted_trie_updates);
231                                }
232                            },
233                        );
234                    }
235                    #[cfg(not(feature = "rayon"))]
236                    {
237                        if !sorted_hashed_state.is_empty() {
238                            Arc::make_mut(&mut overlay.state)
239                                .extend_ref_and_sort(&sorted_hashed_state);
240                        }
241                        if !sorted_trie_updates.is_empty() {
242                            Arc::make_mut(&mut overlay.nodes)
243                                .extend_ref_and_sort(&sorted_trie_updates);
244                        }
245                    }
246                    overlay
247                }
248                // Case 2: Parent exists but anchor mismatch or no cached overlay.
249                // We must rebuild from the ancestors list (which only contains unpersisted blocks).
250                _ => Self::merge_ancestors_into_overlay(
251                    ancestors,
252                    &sorted_hashed_state,
253                    &sorted_trie_updates,
254                ),
255            }
256        } else {
257            // Case 3: No in-memory ancestors (first block after persisted anchor).
258            // Build overlay with just this block's data.
259            Self::merge_ancestors_into_overlay(&[], &sorted_hashed_state, &sorted_trie_updates)
260        };
261
262        ComputedTrieData::with_trie_input(
263            Arc::new(sorted_hashed_state),
264            Arc::new(sorted_trie_updates),
265            anchor_hash,
266            Arc::new(overlay),
267        )
268    }
269
270    /// Merge all ancestors and current block's data into a single overlay.
271    ///
272    /// This is a rare fallback path, only used when no ancestor has a cached
273    /// `anchored_trie_input` (e.g., blocks created via alternative constructors).
274    /// In normal operation, the parent always has a cached overlay and this
275    /// function is never called.
276    ///
277    /// When the `rayon` feature is enabled:
278    /// 1. Collects ancestor data (states and updates)
279    /// 2. Merges states and trie updates in parallel using k-way merge
280    #[cfg(feature = "rayon")]
281    fn merge_ancestors_into_overlay(
282        ancestors: &[Self],
283        sorted_hashed_state: &HashedPostStateSorted,
284        sorted_trie_updates: &TrieUpdatesSorted,
285    ) -> TrieInputSorted {
286        // Early exit: no ancestors means just wrap current block's data
287        if ancestors.is_empty() {
288            return TrieInputSorted::new(
289                Arc::new(sorted_trie_updates.clone()),
290                Arc::new(sorted_hashed_state.clone()),
291                Default::default(),
292            );
293        }
294
295        // Collect ancestor data in reverse (newest to oldest) for merge_slice
296        let (states, updates): (Vec<_>, Vec<_>) = ancestors
297            .iter()
298            .rev()
299            .map(|a| {
300                // Note: we can assume that this trie data has already been computed
301                let data = a.wait_cloned();
302                (data.hashed_state, data.trie_updates)
303            })
304            .unzip();
305
306        // Merge state and nodes in parallel using k-way merge
307        let (state, nodes) = rayon::join(
308            || {
309                let mut merged = HashedPostStateSorted::merge_slice(&states);
310                merged.extend_ref_and_sort(sorted_hashed_state);
311                merged
312            },
313            || {
314                let mut merged = TrieUpdatesSorted::merge_slice(&updates);
315                merged.extend_ref_and_sort(sorted_trie_updates);
316                merged
317            },
318        );
319
320        TrieInputSorted::new(Arc::new(nodes), Arc::new(state), Default::default())
321    }
322
323    /// Sequential fallback when rayon is not available.
324    #[cfg(not(feature = "rayon"))]
325    fn merge_ancestors_into_overlay(
326        ancestors: &[Self],
327        sorted_hashed_state: &HashedPostStateSorted,
328        sorted_trie_updates: &TrieUpdatesSorted,
329    ) -> TrieInputSorted {
330        let _span = debug_span!(target: "engine::tree::deferred_trie", "merge_ancestors", num_ancestors = ancestors.len()).entered();
331        let mut overlay = TrieInputSorted::default();
332
333        let state_mut = Arc::make_mut(&mut overlay.state);
334        let nodes_mut = Arc::make_mut(&mut overlay.nodes);
335
336        for ancestor in ancestors {
337            let ancestor_data = ancestor.wait_cloned();
338            state_mut.extend_ref_and_sort(ancestor_data.hashed_state.as_ref());
339            nodes_mut.extend_ref_and_sort(ancestor_data.trie_updates.as_ref());
340        }
341
342        state_mut.extend_ref_and_sort(sorted_hashed_state);
343        nodes_mut.extend_ref_and_sort(sorted_trie_updates);
344
345        overlay
346    }
347
348    /// Returns trie data, computing synchronously if the async task hasn't completed.
349    ///
350    /// - If the async task has completed (`Ready`), returns the cached result.
351    /// - If pending, computes synchronously from stored inputs.
352    ///
353    /// Deadlock is avoided as long as the provided ancestors form a true ancestor chain (a DAG):
354    /// - Each block only waits on its ancestors (blocks on the path to the persisted root)
355    /// - Sibling blocks (forks) are never in each other's ancestor lists
356    /// - A block never waits on its descendants
357    ///
358    /// Given that invariant, circular wait dependencies are impossible.
359    #[instrument(level = "debug", target = "engine::tree::deferred_trie", skip_all)]
360    pub fn wait_cloned(&self) -> ComputedTrieData {
361        let mut state = self.state.lock();
362        match &mut *state {
363            // If the deferred trie data is ready, return the cached result.
364            DeferredState::Ready(bundle) => {
365                DEFERRED_TRIE_METRICS.deferred_trie_async_ready.increment(1);
366                bundle.clone()
367            }
368            // If the deferred trie data is pending, compute the trie data synchronously and return
369            // the result. This is the fallback path if the async task hasn't completed.
370            DeferredState::Pending(maybe_inputs) => {
371                DEFERRED_TRIE_METRICS.deferred_trie_sync_fallback.increment(1);
372
373                let inputs = maybe_inputs.take().expect("inputs must be present in Pending state");
374
375                let computed = Self::sort_and_build_trie_input(
376                    inputs.hashed_state,
377                    inputs.trie_updates,
378                    inputs.anchor_hash,
379                    &inputs.ancestors,
380                );
381                *state = DeferredState::Ready(computed.clone());
382
383                // Release lock before inputs (and its ancestors) drop to avoid holding it
384                // while their potential last Arc refs drop (which could trigger recursive locking)
385                drop(state);
386
387                computed
388            }
389        }
390    }
391}
392
393impl ComputedTrieData {
394    /// Construct a bundle that includes trie input anchored to a persisted ancestor.
395    pub const fn with_trie_input(
396        hashed_state: Arc<HashedPostStateSorted>,
397        trie_updates: Arc<TrieUpdatesSorted>,
398        anchor_hash: B256,
399        trie_input: Arc<TrieInputSorted>,
400    ) -> Self {
401        Self {
402            hashed_state,
403            trie_updates,
404            anchored_trie_input: Some(AnchoredTrieInput { anchor_hash, trie_input }),
405        }
406    }
407
408    /// Construct a bundle without trie input or anchor information.
409    ///
410    /// Unlike [`Self::with_trie_input`], this constructor omits the accumulated trie input overlay
411    /// and its anchor hash. Use this when the trie input is not needed, such as in block builders
412    /// or sequencers that don't require proof generation on top of in-memory state.
413    ///
414    /// The trie input anchor identifies the persisted block hash from which the in-memory overlay
415    /// was built. Without it, consumers cannot determine which on-disk state to combine with.
416    pub const fn without_trie_input(
417        hashed_state: Arc<HashedPostStateSorted>,
418        trie_updates: Arc<TrieUpdatesSorted>,
419    ) -> Self {
420        Self { hashed_state, trie_updates, anchored_trie_input: None }
421    }
422
423    /// Returns the anchor hash, if present.
424    pub fn anchor_hash(&self) -> Option<B256> {
425        self.anchored_trie_input.as_ref().map(|anchored| anchored.anchor_hash)
426    }
427
428    /// Returns the trie input, if present.
429    pub fn trie_input(&self) -> Option<&Arc<TrieInputSorted>> {
430        self.anchored_trie_input.as_ref().map(|anchored| &anchored.trie_input)
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437    use alloy_primitives::{map::B256Map, U256};
438    use reth_primitives_traits::Account;
439    use reth_trie::updates::TrieUpdates;
440    use std::{
441        sync::Arc,
442        thread,
443        time::{Duration, Instant},
444    };
445
446    fn empty_bundle() -> ComputedTrieData {
447        ComputedTrieData {
448            hashed_state: Arc::default(),
449            trie_updates: Arc::default(),
450            anchored_trie_input: None,
451        }
452    }
453
454    fn empty_pending() -> DeferredTrieData {
455        empty_pending_with_anchor(B256::ZERO)
456    }
457
458    fn empty_pending_with_anchor(anchor: B256) -> DeferredTrieData {
459        DeferredTrieData::pending(
460            Arc::new(HashedPostState::default()),
461            Arc::new(TrieUpdates::default()),
462            anchor,
463            Vec::new(),
464        )
465    }
466
467    /// Verifies that a ready handle returns immediately without computation.
468    #[test]
469    fn ready_returns_immediately() {
470        let bundle = empty_bundle();
471        let deferred = DeferredTrieData::ready(bundle.clone());
472
473        let start = Instant::now();
474        let result = deferred.wait_cloned();
475        let elapsed = start.elapsed();
476
477        assert_eq!(result.hashed_state, bundle.hashed_state);
478        assert_eq!(result.trie_updates, bundle.trie_updates);
479        assert_eq!(result.anchor_hash(), bundle.anchor_hash());
480        assert!(elapsed < Duration::from_millis(20));
481    }
482
483    /// Verifies that a pending handle computes trie data synchronously via fallback.
484    #[test]
485    fn pending_computes_fallback() {
486        let deferred = empty_pending();
487
488        // wait_cloned should compute from inputs without blocking
489        let start = Instant::now();
490        let result = deferred.wait_cloned();
491        let elapsed = start.elapsed();
492
493        // Should return quickly (fallback computation)
494        assert!(elapsed < Duration::from_millis(100));
495        assert!(result.hashed_state.is_empty());
496    }
497
498    /// Verifies that fallback computation result is cached for subsequent calls.
499    #[test]
500    fn fallback_result_is_cached() {
501        let deferred = empty_pending();
502
503        // First call computes and should stash the result
504        let first = deferred.wait_cloned();
505        // Second call should reuse the cached result (same Arc pointer)
506        let second = deferred.wait_cloned();
507
508        assert!(Arc::ptr_eq(&first.hashed_state, &second.hashed_state));
509        assert!(Arc::ptr_eq(&first.trie_updates, &second.trie_updates));
510        assert_eq!(first.anchor_hash(), second.anchor_hash());
511    }
512
513    /// Verifies that concurrent `wait_cloned` calls result in only one computation,
514    /// with all callers receiving the same cached result.
515    #[test]
516    fn concurrent_wait_cloned_computes_once() {
517        let deferred = empty_pending();
518
519        // Spawn multiple threads that all call wait_cloned concurrently
520        let handles: Vec<_> = (0..10)
521            .map(|_| {
522                let d = deferred.clone();
523                thread::spawn(move || d.wait_cloned())
524            })
525            .collect();
526
527        // Collect all results
528        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
529
530        // All results should share the same Arc pointers (same computed result)
531        let first = &results[0];
532        for result in &results[1..] {
533            assert!(Arc::ptr_eq(&first.hashed_state, &result.hashed_state));
534            assert!(Arc::ptr_eq(&first.trie_updates, &result.trie_updates));
535        }
536    }
537
538    /// Tests that ancestor trie data is merged during fallback computation and that the
539    /// resulting `ComputedTrieData` uses the current block's anchor hash, not the ancestor's.
540    #[test]
541    fn ancestors_are_merged() {
542        // Create ancestor with some data
543        let ancestor_bundle = ComputedTrieData {
544            hashed_state: Arc::default(),
545            trie_updates: Arc::default(),
546            anchored_trie_input: Some(AnchoredTrieInput {
547                anchor_hash: B256::with_last_byte(1),
548                trie_input: Arc::new(TrieInputSorted::default()),
549            }),
550        };
551        let ancestor = DeferredTrieData::ready(ancestor_bundle);
552
553        // Create pending with ancestor
554        let deferred = DeferredTrieData::pending(
555            Arc::new(HashedPostState::default()),
556            Arc::new(TrieUpdates::default()),
557            B256::with_last_byte(2),
558            vec![ancestor],
559        );
560
561        let result = deferred.wait_cloned();
562        // Should have the current block's anchor, not the ancestor's
563        assert_eq!(result.anchor_hash(), Some(B256::with_last_byte(2)));
564    }
565
566    /// Ensures ancestor overlays are merged oldest -> newest so latest state wins (no overwrite by
567    /// older ancestors).
568    #[test]
569    fn ancestors_merge_in_chronological_order() {
570        let key = B256::with_last_byte(1);
571        // Oldest ancestor sets nonce to 1
572        let oldest_state = HashedPostStateSorted::new(
573            vec![(key, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
574            B256Map::default(),
575        );
576        // Newest ancestor overwrites nonce to 2
577        let newest_state = HashedPostStateSorted::new(
578            vec![(key, Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }))],
579            B256Map::default(),
580        );
581
582        let oldest = ComputedTrieData {
583            hashed_state: Arc::new(oldest_state),
584            trie_updates: Arc::default(),
585            anchored_trie_input: None,
586        };
587        let newest = ComputedTrieData {
588            hashed_state: Arc::new(newest_state),
589            trie_updates: Arc::default(),
590            anchored_trie_input: None,
591        };
592
593        // Pass ancestors oldest -> newest; newest should take precedence
594        let deferred = DeferredTrieData::pending(
595            Arc::new(HashedPostState::default()),
596            Arc::new(TrieUpdates::default()),
597            B256::ZERO,
598            vec![DeferredTrieData::ready(oldest), DeferredTrieData::ready(newest)],
599        );
600
601        let result = deferred.wait_cloned();
602        let overlay_state = &result.anchored_trie_input.as_ref().unwrap().trie_input.state.accounts;
603        assert_eq!(overlay_state.len(), 1);
604        let (_, account) = &overlay_state[0];
605        assert_eq!(account.unwrap().nonce, 2);
606    }
607
608    /// Helper to create a ready block with anchored trie input containing specific state.
609    fn ready_block_with_state(
610        anchor_hash: B256,
611        accounts: Vec<(B256, Option<Account>)>,
612    ) -> DeferredTrieData {
613        let hashed_state = Arc::new(HashedPostStateSorted::new(accounts, B256Map::default()));
614        let trie_updates = Arc::default();
615        let mut overlay = TrieInputSorted::default();
616        Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state.as_ref());
617
618        DeferredTrieData::ready(ComputedTrieData {
619            hashed_state,
620            trie_updates,
621            anchored_trie_input: Some(AnchoredTrieInput {
622                anchor_hash,
623                trie_input: Arc::new(overlay),
624            }),
625        })
626    }
627
628    /// Verifies that first block after anchor (no ancestors) creates empty base overlay.
629    #[test]
630    fn first_block_after_anchor_creates_empty_base() {
631        let anchor = B256::with_last_byte(1);
632        let key = B256::with_last_byte(42);
633        let account = Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None };
634
635        // First block after anchor - no ancestors
636        let first_block = DeferredTrieData::pending(
637            Arc::new(HashedPostState::default().with_accounts([(key, Some(account))])),
638            Arc::new(TrieUpdates::default()),
639            anchor,
640            vec![], // No ancestors
641        );
642
643        let result = first_block.wait_cloned();
644
645        // Should have overlay with just this block's data
646        let overlay = result.anchored_trie_input.as_ref().unwrap();
647        assert_eq!(overlay.anchor_hash, anchor);
648        assert_eq!(overlay.trie_input.state.accounts.len(), 1);
649        let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
650        assert_eq!(*found_key, key);
651        assert_eq!(found_account.unwrap().nonce, 1);
652    }
653
654    /// Verifies that parent's overlay is reused regardless of anchor.
655    #[test]
656    fn reuses_parent_overlay() {
657        let anchor = B256::with_last_byte(1);
658        let key = B256::with_last_byte(42);
659        let account = Account { nonce: 100, balance: U256::ZERO, bytecode_hash: None };
660
661        // Create parent with anchored trie input
662        let parent = ready_block_with_state(anchor, vec![(key, Some(account))]);
663
664        // Create child - should reuse parent's overlay
665        let child = DeferredTrieData::pending(
666            Arc::new(HashedPostState::default()),
667            Arc::new(TrieUpdates::default()),
668            anchor,
669            vec![parent],
670        );
671
672        let result = child.wait_cloned();
673
674        // Verify parent's account is in the overlay
675        let overlay = result.anchored_trie_input.as_ref().unwrap();
676        assert_eq!(overlay.anchor_hash, anchor);
677        assert_eq!(overlay.trie_input.state.accounts.len(), 1);
678        let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
679        assert_eq!(*found_key, key);
680        assert_eq!(found_account.unwrap().nonce, 100);
681    }
682
683    /// Verifies that parent's overlay is NOT reused when anchor changes (after persist).
684    /// The overlay data is dependent on the anchor, so it must be rebuilt from the
685    /// remaining ancestors.
686    #[test]
687    fn rebuilds_overlay_when_anchor_changes() {
688        let old_anchor = B256::with_last_byte(1);
689        let new_anchor = B256::with_last_byte(2);
690        let key = B256::with_last_byte(42);
691        let account = Account { nonce: 50, balance: U256::ZERO, bytecode_hash: None };
692
693        // Create parent with OLD anchor
694        let parent = ready_block_with_state(old_anchor, vec![(key, Some(account))]);
695
696        // Create child with NEW anchor (simulates after persist)
697        // Should NOT reuse parent's overlay because anchor changed
698        let child = DeferredTrieData::pending(
699            Arc::new(HashedPostState::default()),
700            Arc::new(TrieUpdates::default()),
701            new_anchor,
702            vec![parent],
703        );
704
705        let result = child.wait_cloned();
706
707        // Verify result uses new anchor
708        let overlay = result.anchored_trie_input.as_ref().unwrap();
709        assert_eq!(overlay.anchor_hash, new_anchor);
710
711        // Crucially, since we provided `parent` in ancestors but it has a different anchor,
712        // the code falls back to `merge_ancestors_into_overlay`.
713        // `merge_ancestors_into_overlay` reads `parent.hashed_state` (which has the account).
714        // So the account IS present, but it was obtained via REBUILD, not REUSE.
715        // We can check `DEFERRED_TRIE_METRICS` if we want to be sure, but functionally:
716        assert_eq!(overlay.trie_input.state.accounts.len(), 1);
717        let (found_key, found_account) = &overlay.trie_input.state.accounts[0];
718        assert_eq!(*found_key, key);
719        assert_eq!(found_account.unwrap().nonce, 50);
720    }
721
722    /// Verifies that parent without `anchored_trie_input` triggers rebuild path.
723    #[test]
724    fn rebuilds_when_parent_has_no_anchored_input() {
725        let anchor = B256::with_last_byte(1);
726        let key = B256::with_last_byte(42);
727        let account = Account { nonce: 25, balance: U256::ZERO, bytecode_hash: None };
728
729        // Create parent WITHOUT anchored trie input (e.g., from without_trie_input constructor)
730        let parent_state =
731            HashedPostStateSorted::new(vec![(key, Some(account))], B256Map::default());
732        let parent = DeferredTrieData::ready(ComputedTrieData {
733            hashed_state: Arc::new(parent_state),
734            trie_updates: Arc::default(),
735            anchored_trie_input: None, // No anchored input
736        });
737
738        // Create child - should rebuild from parent's hashed_state
739        let child = DeferredTrieData::pending(
740            Arc::new(HashedPostState::default()),
741            Arc::new(TrieUpdates::default()),
742            anchor,
743            vec![parent],
744        );
745
746        let result = child.wait_cloned();
747
748        // Verify overlay is built and contains parent's data
749        let overlay = result.anchored_trie_input.as_ref().unwrap();
750        assert_eq!(overlay.anchor_hash, anchor);
751        assert_eq!(overlay.trie_input.state.accounts.len(), 1);
752    }
753
754    /// Verifies that a chain of blocks with matching anchors builds correct cumulative overlay.
755    #[test]
756    fn chain_of_blocks_builds_cumulative_overlay() {
757        let anchor = B256::with_last_byte(1);
758        let key1 = B256::with_last_byte(1);
759        let key2 = B256::with_last_byte(2);
760        let key3 = B256::with_last_byte(3);
761
762        // Block 1: sets account at key1
763        let block1 = ready_block_with_state(
764            anchor,
765            vec![(key1, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
766        );
767
768        // Block 2: adds account at key2, ancestor is block1
769        let block2_hashed = HashedPostState::default().with_accounts([(
770            key2,
771            Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }),
772        )]);
773        let block2 = DeferredTrieData::pending(
774            Arc::new(block2_hashed),
775            Arc::new(TrieUpdates::default()),
776            anchor,
777            vec![block1.clone()],
778        );
779        // Compute block2's trie data
780        let block2_computed = block2.wait_cloned();
781        let block2_ready = DeferredTrieData::ready(block2_computed);
782
783        // Block 3: adds account at key3, ancestor is block2 (which includes block1)
784        let block3_hashed = HashedPostState::default().with_accounts([(
785            key3,
786            Some(Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }),
787        )]);
788        let block3 = DeferredTrieData::pending(
789            Arc::new(block3_hashed),
790            Arc::new(TrieUpdates::default()),
791            anchor,
792            vec![block1, block2_ready],
793        );
794
795        let result = block3.wait_cloned();
796
797        // Verify all three accounts are in the cumulative overlay
798        let overlay = result.anchored_trie_input.as_ref().unwrap();
799        assert_eq!(overlay.trie_input.state.accounts.len(), 3);
800
801        // Accounts should be sorted by key (B256 ordering)
802        let accounts = &overlay.trie_input.state.accounts;
803        assert!(accounts.iter().any(|(k, a)| *k == key1 && a.unwrap().nonce == 1));
804        assert!(accounts.iter().any(|(k, a)| *k == key2 && a.unwrap().nonce == 2));
805        assert!(accounts.iter().any(|(k, a)| *k == key3 && a.unwrap().nonce == 3));
806    }
807
808    /// Verifies that child block's state overwrites parent's state for the same key.
809    #[test]
810    fn child_state_overwrites_parent() {
811        let anchor = B256::with_last_byte(1);
812        let key = B256::with_last_byte(42);
813
814        // Parent sets nonce to 10
815        let parent = ready_block_with_state(
816            anchor,
817            vec![(key, Some(Account { nonce: 10, balance: U256::ZERO, bytecode_hash: None }))],
818        );
819
820        // Child overwrites nonce to 99
821        let child_hashed = HashedPostState::default().with_accounts([(
822            key,
823            Some(Account { nonce: 99, balance: U256::ZERO, bytecode_hash: None }),
824        )]);
825        let child = DeferredTrieData::pending(
826            Arc::new(child_hashed),
827            Arc::new(TrieUpdates::default()),
828            anchor,
829            vec![parent],
830        );
831
832        let result = child.wait_cloned();
833
834        // Verify child's value wins (extend_ref uses later value)
835        let overlay = result.anchored_trie_input.as_ref().unwrap();
836        // Note: extend_ref may result in duplicate keys; check the last occurrence
837        let accounts = &overlay.trie_input.state.accounts;
838        let last_account = accounts.iter().rfind(|(k, _)| *k == key).unwrap();
839        assert_eq!(last_account.1.unwrap().nonce, 99);
840    }
841
842    /// Stress test: verify O(N) behavior by building a chain of many blocks.
843    /// This test ensures the fix doesn't regress - previously this would be O(N²).
844    #[test]
845    fn long_chain_builds_in_linear_time() {
846        let anchor = B256::with_last_byte(1);
847        let num_blocks = 50; // Enough to notice O(N²) vs O(N) difference
848
849        let mut ancestors: Vec<DeferredTrieData> = Vec::new();
850
851        let start = Instant::now();
852
853        for i in 0..num_blocks {
854            let key = B256::with_last_byte(i as u8);
855            let account = Account { nonce: i as u64, balance: U256::ZERO, bytecode_hash: None };
856            let hashed = HashedPostState::default().with_accounts([(key, Some(account))]);
857
858            let block = DeferredTrieData::pending(
859                Arc::new(hashed),
860                Arc::new(TrieUpdates::default()),
861                anchor,
862                ancestors.clone(),
863            );
864
865            // Compute and add to ancestors for next iteration
866            let computed = block.wait_cloned();
867            ancestors.push(DeferredTrieData::ready(computed));
868        }
869
870        let elapsed = start.elapsed();
871
872        // With O(N) fix, 50 blocks should complete quickly (< 1 second)
873        // With O(N²), this would take significantly longer
874        assert!(
875            elapsed < Duration::from_secs(2),
876            "Chain of {num_blocks} blocks took {:?}, possible O(N²) regression",
877            elapsed
878        );
879
880        // Verify final overlay has all accounts
881        let final_result = ancestors.last().unwrap().wait_cloned();
882        let overlay = final_result.anchored_trie_input.as_ref().unwrap();
883        assert_eq!(overlay.trie_input.state.accounts.len(), num_blocks);
884    }
885
886    /// Verifies that a multi-ancestor overlay is rebuilt when anchor changes.
887    /// This simulates the "persist prefix then keep building" scenario where:
888    /// 1. A chain of blocks is built with anchor A
889    /// 2. Some blocks are persisted, changing anchor to B
890    /// 3. New blocks must rebuild the overlay from the remaining ancestors
891    #[test]
892    fn multi_ancestor_overlay_rebuilt_after_anchor_change() {
893        let old_anchor = B256::with_last_byte(1);
894        let new_anchor = B256::with_last_byte(2);
895        let key1 = B256::with_last_byte(1);
896        let key2 = B256::with_last_byte(2);
897        let key3 = B256::with_last_byte(3);
898        let key4 = B256::with_last_byte(4);
899
900        // Build a chain of 3 blocks with old_anchor
901        let block1 = ready_block_with_state(
902            old_anchor,
903            vec![(key1, Some(Account { nonce: 1, balance: U256::ZERO, bytecode_hash: None }))],
904        );
905
906        let block2_hashed = HashedPostState::default().with_accounts([(
907            key2,
908            Some(Account { nonce: 2, balance: U256::ZERO, bytecode_hash: None }),
909        )]);
910        let block2 = DeferredTrieData::pending(
911            Arc::new(block2_hashed),
912            Arc::new(TrieUpdates::default()),
913            old_anchor,
914            vec![block1.clone()],
915        );
916        let block2_ready = DeferredTrieData::ready(block2.wait_cloned());
917
918        let block3_hashed = HashedPostState::default().with_accounts([(
919            key3,
920            Some(Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }),
921        )]);
922        let block3 = DeferredTrieData::pending(
923            Arc::new(block3_hashed),
924            Arc::new(TrieUpdates::default()),
925            old_anchor,
926            vec![block1.clone(), block2_ready.clone()],
927        );
928        let block3_ready = DeferredTrieData::ready(block3.wait_cloned());
929
930        // Verify block3's overlay has all 3 accounts with old_anchor
931        let block3_overlay = block3_ready.wait_cloned().anchored_trie_input.unwrap();
932        assert_eq!(block3_overlay.anchor_hash, old_anchor);
933        assert_eq!(block3_overlay.trie_input.state.accounts.len(), 3);
934
935        // Now simulate persist: create block4 with NEW anchor but same ancestors.
936        // To verify correct rebuilding, we must provide ALL unpersisted ancestors.
937        // If we only provided block3, the rebuild would only see block3's state.
938        // We pass block1, block2, block3 to simulate that they are all still in memory
939        // but the anchor check forces a rebuild (e.g. artificial anchor change).
940        let block4_hashed = HashedPostState::default().with_accounts([(
941            key4,
942            Some(Account { nonce: 4, balance: U256::ZERO, bytecode_hash: None }),
943        )]);
944        let block4 = DeferredTrieData::pending(
945            Arc::new(block4_hashed),
946            Arc::new(TrieUpdates::default()),
947            new_anchor, // Different anchor - simulates post-persist
948            vec![block1, block2_ready, block3_ready],
949        );
950
951        let result = block4.wait_cloned();
952
953        // Verify:
954        // 1. New anchor is used in result
955        assert_eq!(result.anchor_hash(), Some(new_anchor));
956
957        // 2. All 4 accounts are in the overlay (rebuilt from ancestors + extended)
958        let overlay = result.anchored_trie_input.as_ref().unwrap();
959        assert_eq!(overlay.trie_input.state.accounts.len(), 4);
960
961        // 3. All accounts have correct values
962        let accounts = &overlay.trie_input.state.accounts;
963        assert!(accounts.iter().any(|(k, a)| *k == key1 && a.unwrap().nonce == 1));
964        assert!(accounts.iter().any(|(k, a)| *k == key2 && a.unwrap().nonce == 2));
965        assert!(accounts.iter().any(|(k, a)| *k == key3 && a.unwrap().nonce == 3));
966        assert!(accounts.iter().any(|(k, a)| *k == key4 && a.unwrap().nonce == 4));
967    }
968}