Skip to main content

reth_chain_state/
state_trie_overlay.rs

1//! Flattened state trie overlays for in-memory blocks.
2//!
3//! Payload validation needs a view of the state trie as of an in-memory parent block even when that
4//! parent has not been persisted yet. [`StateTrieOverlayManager`] tracks those in-memory blocks and
5//! builds reusable flattened state trie overlays on demand.
6
7use crate::{EthPrimitives, ExecutedBlock};
8use alloy_primitives::B256;
9use reth_metrics::{
10    metrics::{Counter, Histogram},
11    Metrics,
12};
13use reth_primitives_traits::{
14    dashmap::{mapref::entry::Entry, DashMap},
15    AlloyBlockHeader, NodePrimitives,
16};
17#[cfg(feature = "rayon")]
18use reth_tasks::WorkerPool;
19use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
20use std::{fmt, sync::Arc, time::Instant};
21use tracing::{debug, trace};
22
23/// Manages flattened state trie overlays for in-memory blocks.
24///
25/// The manager owns the in-memory block graph and a cache of flattened state trie overlays keyed by
26/// `(anchor_hash, tip_hash)`.
27#[derive(Clone)]
28pub struct StateTrieOverlayManager<N: NodePrimitives = EthPrimitives> {
29    blocks: Arc<DashMap<B256, ExecutedBlock<N>>>,
30    overlays: Arc<DashMap<OverlayCacheKey, Arc<TrieInputSorted>>>,
31    #[cfg(feature = "rayon")]
32    worker_pool: Option<Arc<WorkerPool>>,
33    metrics: StateTrieOverlayMetrics,
34}
35
36/// Metrics for state trie overlay management.
37#[derive(Clone, Metrics)]
38#[metrics(scope = "sync.block_validation.state_trie_overlay")]
39struct StateTrieOverlayMetrics {
40    /// Duration of overlay computation in seconds.
41    overlay_computation_duration_seconds: Histogram,
42    /// Number of requests satisfied by an existing overlay cache entry.
43    overlay_cache_reuses: Counter,
44    /// Number of overlay cache entries populated by computing an overlay.
45    overlay_cache_fills: Counter,
46}
47
48impl<N: NodePrimitives> Default for StateTrieOverlayManager<N> {
49    fn default() -> Self {
50        Self {
51            blocks: Default::default(),
52            overlays: Default::default(),
53            #[cfg(feature = "rayon")]
54            worker_pool: None,
55            metrics: Default::default(),
56        }
57    }
58}
59
60impl<N: NodePrimitives> std::fmt::Debug for StateTrieOverlayManager<N> {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("StateTrieOverlayManager")
63            .field("blocks", &self.blocks.len())
64            .field("overlays", &self.overlays.len())
65            .finish()
66    }
67}
68
69impl<N: NodePrimitives> StateTrieOverlayManager<N> {
70    /// Create a new [`StateTrieOverlayManager`] backed by the given worker pool.
71    #[cfg(feature = "rayon")]
72    pub fn new(worker_pool: Arc<WorkerPool>) -> Self {
73        Self {
74            blocks: Default::default(),
75            overlays: Default::default(),
76            worker_pool: Some(worker_pool),
77            metrics: Default::default(),
78        }
79    }
80
81    /// Inserts an executed in-memory block into the state trie overlay manager.
82    #[tracing::instrument(
83        level = "trace",
84        target = "chain_state::state_trie_overlay",
85        skip_all,
86        fields(
87            block_hash = %block.recovered_block().hash(),
88            parent_hash = %block.recovered_block().parent_hash(),
89            duplicate = false,
90        )
91    )]
92    pub fn insert_block(&self, block: ExecutedBlock<N>) {
93        let hash = block.recovered_block().hash();
94        let parent_hash = block.recovered_block().parent_hash();
95        let span = tracing::Span::current();
96
97        // First add the block to the live graph; duplicate inserts do not need cache work.
98        match self.blocks.entry(hash) {
99            Entry::Occupied(_) => {
100                span.record("duplicate", true);
101                debug!(
102                    target: "chain_state::state_trie_overlay",
103                    %hash,
104                    %parent_hash,
105                    "state trie overlay block already inserted"
106                );
107                return
108            }
109            Entry::Vacant(entry) => {
110                entry.insert(block);
111            }
112        }
113
114        // Snapshot matching parent overlays before spawning so DashMap iteration guards are
115        // dropped.
116        let cached_parent_overlays = self
117            .overlays
118            .iter()
119            .filter_map(|entry| {
120                let key = *entry.key();
121                (key.tip_hash == parent_hash).then_some(key.anchor_hash)
122            })
123            .collect::<Vec<_>>();
124
125        debug!(
126            target: "chain_state::state_trie_overlay",
127            %hash,
128            %parent_hash,
129            "inserted block into state trie overlay manager"
130        );
131        if cached_parent_overlays.is_empty() {
132            return
133        }
134
135        #[cfg(feature = "rayon")]
136        let Some(worker_pool) = self.worker_pool.clone() else {
137            return
138        };
139
140        #[cfg(not(feature = "rayon"))]
141        let _ = cached_parent_overlays;
142
143        #[cfg(feature = "rayon")]
144        {
145            let parent_span = span;
146            for anchor_hash in cached_parent_overlays {
147                let manager = <Self as Clone>::clone(self);
148                let parent_span = parent_span.clone();
149                worker_pool.spawn(move || {
150                    let _span = tracing::trace_span!(
151                        target: "chain_state::state_trie_overlay",
152                        parent: parent_span,
153                        "precompute_state_trie_overlay",
154                        tip_hash = %hash,
155                        anchor_hash = %anchor_hash,
156                    )
157                    .entered();
158                    let _ = manager.get_overlay(hash, anchor_hash);
159                });
160            }
161        }
162    }
163
164    /// Removes blocks from the live block graph and prunes cached overlays that can no longer be
165    /// built from the remaining blocks.
166    #[tracing::instrument(
167        level = "trace",
168        target = "chain_state::state_trie_overlay",
169        skip_all,
170        fields(
171            block_count = tracing::field::Empty,
172            removed_blocks = tracing::field::Empty,
173            pruned_overlays = tracing::field::Empty,
174        )
175    )]
176    pub fn remove_blocks(&self, hashes: impl IntoIterator<Item = B256>) {
177        let span = tracing::Span::current();
178
179        // Remove blocks first, then prune overlays against the remaining block graph.
180        let mut block_count = 0usize;
181        let mut removed_blocks = 0usize;
182        let mut pruned_overlays = 0usize;
183        for hash in hashes {
184            block_count += 1;
185            removed_blocks += self.blocks.remove(&hash).is_some() as usize;
186        }
187        span.record("block_count", block_count);
188        span.record("removed_blocks", removed_blocks);
189
190        if removed_blocks > 0 {
191            let overlays_before = self.overlays.len();
192            let blocks = Arc::clone(&self.blocks);
193            self.overlays.retain(|key, _| {
194                key.tip_hash != key.anchor_hash &&
195                    Self::anchor_for_parent_in(blocks.as_ref(), key.tip_hash, key.anchor_hash) ==
196                        Some(key.anchor_hash)
197            });
198            pruned_overlays = overlays_before.saturating_sub(self.overlays.len());
199            span.record("pruned_overlays", pruned_overlays);
200        }
201        debug!(
202            target: "chain_state::state_trie_overlay",
203            block_count,
204            removed_blocks,
205            pruned_overlays,
206            "removed blocks from state trie overlay manager"
207        );
208    }
209
210    /// Returns the flattened overlay from `anchor_hash` to `parent_hash`.
211    #[tracing::instrument(
212        level = "trace",
213        target = "chain_state::state_trie_overlay",
214        skip_all,
215        fields(tip_hash = %parent_hash, anchor_hash = %anchor_hash)
216    )]
217    pub fn overlay_for_parent(
218        &self,
219        parent_hash: B256,
220        anchor_hash: B256,
221    ) -> Result<(Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>), StateTrieOverlayError> {
222        debug!(
223            target: "chain_state::state_trie_overlay",
224            tip_hash = %parent_hash,
225            %anchor_hash,
226            "loading state trie overlay for parent"
227        );
228        let input = self.get_overlay(parent_hash, anchor_hash)?;
229        Ok((Arc::clone(&input.nodes), Arc::clone(&input.state)))
230    }
231
232    #[tracing::instrument(
233        level = "trace",
234        target = "chain_state::state_trie_overlay",
235        skip_all,
236        fields(
237            tip_hash = %tip_hash,
238            anchor_hash = %anchor_hash,
239            cache_reused = tracing::field::Empty,
240            block_count = tracing::field::Empty,
241            parent_overlay_reused = tracing::field::Empty,
242        )
243    )]
244    fn get_overlay(
245        &self,
246        tip_hash: B256,
247        anchor_hash: B256,
248    ) -> Result<Arc<TrieInputSorted>, StateTrieOverlayError> {
249        let key = OverlayCacheKey { anchor_hash, tip_hash };
250        let span = tracing::Span::current();
251
252        if let Some(input) = self.overlays.get(&key).map(|entry| Arc::clone(entry.value())) {
253            self.metrics.overlay_cache_reuses.increment(1);
254            span.record("cache_reused", true);
255            return Ok(input)
256        }
257        span.record("cache_reused", false);
258
259        // Resolve the block path and any cached parent overlay before locking the child entry.
260        let mut hash = tip_hash;
261        let mut blocks = Vec::new();
262        loop {
263            let block =
264                self.blocks.get(&hash).ok_or(StateTrieOverlayError { tip_hash, anchor_hash })?;
265            let parent_hash = block.recovered_block().parent_hash();
266            blocks.push(block.clone());
267
268            if parent_hash == anchor_hash {
269                break
270            }
271            hash = parent_hash;
272        }
273        span.record("block_count", blocks.len());
274        let parent_input = blocks.first().and_then(|block| {
275            let parent_hash = block.recovered_block().parent_hash();
276            (parent_hash != anchor_hash)
277                .then(|| {
278                    self.overlays
279                        .get(&OverlayCacheKey { anchor_hash, tip_hash: parent_hash })
280                        .map(|entry| Arc::clone(entry.value()))
281                })
282                .flatten()
283        });
284        span.record("parent_overlay_reused", parent_input.is_some());
285        let compute_input = match parent_input {
286            Some(parent_input) => {
287                ComputeOverlayInput::ExtendCached { block: blocks.swap_remove(0), parent_input }
288            }
289            None => ComputeOverlayInput::MergeBlocks(blocks),
290        };
291
292        // The vacant entry is the cache-fill gate: racing callers block instead of recomputing.
293        let input = match self.overlays.entry(key) {
294            Entry::Occupied(entry) => {
295                self.metrics.overlay_cache_reuses.increment(1);
296                span.record("cache_reused", true);
297                return Ok(Arc::clone(entry.get()))
298            }
299            Entry::Vacant(entry) => {
300                self.metrics.overlay_cache_fills.increment(1);
301                let input = {
302                    #[cfg(feature = "rayon")]
303                    {
304                        if let Some(worker_pool) = &self.worker_pool {
305                            let compute_span = span;
306                            let metrics = self.metrics.clone();
307                            Arc::new(worker_pool.install_fn(move || {
308                                let _guard = compute_span.enter();
309                                compute_overlay(compute_input, anchor_hash, &metrics)
310                            }))
311                        } else {
312                            Arc::new(compute_overlay(compute_input, anchor_hash, &self.metrics))
313                        }
314                    }
315
316                    #[cfg(not(feature = "rayon"))]
317                    {
318                        Arc::new(compute_overlay(compute_input, anchor_hash, &self.metrics))
319                    }
320                };
321
322                entry.insert(Arc::clone(&input));
323                input
324            }
325        };
326
327        Ok(input)
328    }
329
330    /// Returns `preferred_anchor` if it is on the parent chain, otherwise the first missing parent.
331    ///
332    /// Returns `None` if `parent_hash` is not `preferred_anchor` and the manager does not contain a
333    /// block for `parent_hash`, meaning there is no in-memory parent chain to inspect.
334    pub fn anchor_for_parent(&self, parent_hash: B256, preferred_anchor: B256) -> Option<B256> {
335        Self::anchor_for_parent_in(self.blocks.as_ref(), parent_hash, preferred_anchor)
336    }
337
338    fn anchor_for_parent_in(
339        blocks: &DashMap<B256, ExecutedBlock<N>>,
340        parent_hash: B256,
341        preferred_anchor: B256,
342    ) -> Option<B256> {
343        if parent_hash == preferred_anchor {
344            return Some(preferred_anchor)
345        }
346
347        let mut hash = parent_hash;
348
349        loop {
350            let block_parent_hash = blocks.get(&hash)?.recovered_block().parent_hash();
351            if block_parent_hash == preferred_anchor {
352                return Some(block_parent_hash)
353            }
354            if !blocks.contains_key(&block_parent_hash) {
355                return Some(block_parent_hash)
356            }
357            hash = block_parent_hash;
358        }
359    }
360}
361
362/// Error returned when a state trie overlay cannot be built from the manager's current block set.
363#[derive(Debug)]
364pub struct StateTrieOverlayError {
365    /// Requested in-memory tip hash.
366    tip_hash: B256,
367    /// Requested anchor hash.
368    anchor_hash: B256,
369}
370
371impl fmt::Display for StateTrieOverlayError {
372    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373        write!(
374            f,
375            "state trie overlay for tip {} cannot be anchored to {} with current blocks",
376            self.tip_hash, self.anchor_hash
377        )
378    }
379}
380
381impl std::error::Error for StateTrieOverlayError {}
382
383#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
384struct OverlayCacheKey {
385    anchor_hash: B256,
386    tip_hash: B256,
387}
388
389enum ComputeOverlayInput<N: NodePrimitives> {
390    ExtendCached { block: ExecutedBlock<N>, parent_input: Arc<TrieInputSorted> },
391    MergeBlocks(Vec<ExecutedBlock<N>>),
392}
393
394#[tracing::instrument(
395    level = "trace",
396    target = "chain_state::state_trie_overlay",
397    skip_all,
398    fields(
399        anchor_hash = %anchor_hash,
400        block_count = tracing::field::Empty,
401        parent_overlay = tracing::field::Empty,
402        elapsed_us = tracing::field::Empty,
403    )
404)]
405fn compute_overlay<N: NodePrimitives>(
406    input: ComputeOverlayInput<N>,
407    anchor_hash: B256,
408    metrics: &StateTrieOverlayMetrics,
409) -> TrieInputSorted {
410    let started_at = Instant::now();
411    let block_count = match &input {
412        ComputeOverlayInput::ExtendCached { .. } => 1,
413        ComputeOverlayInput::MergeBlocks(blocks) => blocks.len(),
414    };
415    let parent_overlay = matches!(&input, ComputeOverlayInput::ExtendCached { .. });
416    tracing::Span::current().record("block_count", block_count);
417    tracing::Span::current().record("parent_overlay", parent_overlay);
418
419    let overlay = match input {
420        ComputeOverlayInput::ExtendCached { block, parent_input } => {
421            let trie_data = block.trie_data();
422
423            trace!(
424                target: "chain_state::state_trie_overlay",
425                %anchor_hash,
426                head = %block.recovered_block().hash(),
427                "extending cached parent state trie overlay"
428            );
429
430            let mut overlay = parent_input.as_ref().clone();
431            extend_overlay(&mut overlay, &trie_data.hashed_state, &trie_data.trie_updates);
432            overlay
433        }
434        ComputeOverlayInput::MergeBlocks(blocks) => merge_blocks(blocks),
435    };
436
437    let elapsed = started_at.elapsed();
438    metrics.overlay_computation_duration_seconds.record(elapsed.as_secs_f64());
439    tracing::Span::current().record("elapsed_us", elapsed.as_micros() as u64);
440    debug!(
441        target: "chain_state::state_trie_overlay",
442        %anchor_hash,
443        block_count,
444        parent_overlay,
445        ?elapsed,
446        "computed state trie overlay"
447    );
448
449    overlay
450}
451
452fn merge_blocks<N: NodePrimitives>(blocks: Vec<ExecutedBlock<N>>) -> TrieInputSorted {
453    let trie_data = blocks.iter().map(ExecutedBlock::trie_data).collect::<Vec<_>>();
454
455    #[cfg(feature = "rayon")]
456    let (nodes, state) = rayon::join(
457        || {
458            TrieUpdatesSorted::merge_batch(
459                trie_data.iter().map(|data| Arc::clone(&data.trie_updates)),
460            )
461        },
462        || {
463            HashedPostStateSorted::merge_batch(
464                trie_data.iter().map(|data| Arc::clone(&data.hashed_state)),
465            )
466        },
467    );
468
469    #[cfg(not(feature = "rayon"))]
470    let (nodes, state) = (
471        TrieUpdatesSorted::merge_batch(trie_data.iter().map(|data| Arc::clone(&data.trie_updates))),
472        HashedPostStateSorted::merge_batch(
473            trie_data.iter().map(|data| Arc::clone(&data.hashed_state)),
474        ),
475    );
476
477    TrieInputSorted::new(nodes, state, Default::default())
478}
479
480fn extend_overlay(
481    overlay: &mut TrieInputSorted,
482    hashed_state: &HashedPostStateSorted,
483    trie_updates: &TrieUpdatesSorted,
484) {
485    #[cfg(feature = "rayon")]
486    {
487        rayon::join(
488            || {
489                if !hashed_state.is_empty() {
490                    Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state);
491                }
492            },
493            || {
494                if !trie_updates.is_empty() {
495                    Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(trie_updates);
496                }
497            },
498        );
499    }
500
501    #[cfg(not(feature = "rayon"))]
502    {
503        if !hashed_state.is_empty() {
504            Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state);
505        }
506        if !trie_updates.is_empty() {
507            Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(trie_updates);
508        }
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use crate::{test_utils::TestBlockBuilder, ComputedTrieData, EthPrimitives, ExecutedBlock};
516    use alloy_primitives::U256;
517    use reth_primitives_traits::Account;
518    use reth_trie::{updates::TrieUpdatesSorted, HashedPostState, HashedStorage};
519    use std::sync::Arc;
520    #[cfg(feature = "rayon")]
521    use std::{
522        thread,
523        time::{Duration, Instant},
524    };
525
526    fn with_unique_state(
527        block: &ExecutedBlock<EthPrimitives>,
528        id: u8,
529    ) -> ExecutedBlock<EthPrimitives> {
530        let hashed_address = B256::with_last_byte(id);
531        let hashed_slot = B256::with_last_byte(id.saturating_add(32));
532        let hashed_state = HashedPostState::default()
533            .with_accounts([(hashed_address, Some(Account::default()))])
534            .with_storages([(
535                hashed_address,
536                HashedStorage::from_iter(false, [(hashed_slot, U256::from(id))]),
537            )])
538            .into_sorted();
539
540        ExecutedBlock::new(
541            Arc::clone(&block.recovered_block),
542            Arc::clone(&block.execution_output),
543            ComputedTrieData::new(Arc::new(hashed_state), Arc::new(TrieUpdatesSorted::default())),
544        )
545    }
546
547    fn test_blocks() -> Vec<ExecutedBlock<EthPrimitives>> {
548        TestBlockBuilder::eth()
549            .get_executed_blocks(1..4)
550            .enumerate()
551            .map(|(index, block)| with_unique_state(&block, index as u8 + 1))
552            .collect()
553    }
554
555    #[test]
556    fn errors_for_unknown_parent() {
557        let manager = StateTrieOverlayManager::<EthPrimitives>::default();
558        let parent = B256::random();
559        let anchor = B256::random();
560
561        let err = manager.overlay_for_parent(parent, anchor).unwrap_err();
562
563        assert_eq!(err.tip_hash, parent);
564        assert_eq!(err.anchor_hash, anchor);
565    }
566
567    #[test]
568    fn builds_managed_overlay_for_inserted_blocks() {
569        let manager = StateTrieOverlayManager::default();
570        let blocks = test_blocks();
571        for block in &blocks {
572            manager.insert_block(block.clone());
573        }
574
575        let anchor_hash = blocks[0].recovered_block().parent_hash();
576
577        let (_, state) =
578            manager.overlay_for_parent(blocks[2].recovered_block().hash(), anchor_hash).unwrap();
579        assert_eq!(state.accounts.len(), 3);
580
581        let short_anchor = blocks[1].recovered_block().hash();
582        let (_, short) =
583            manager.overlay_for_parent(blocks[2].recovered_block().hash(), short_anchor).unwrap();
584        assert_eq!(short.accounts.len(), 1);
585        let (_, cached_short) =
586            manager.overlay_for_parent(blocks[2].recovered_block().hash(), short_anchor).unwrap();
587        assert!(Arc::ptr_eq(&short, &cached_short));
588    }
589
590    #[test]
591    fn returns_anchor_for_in_memory_parent() {
592        let manager = StateTrieOverlayManager::default();
593        let blocks = test_blocks();
594        for block in &blocks {
595            manager.insert_block(block.clone());
596        }
597
598        assert_eq!(
599            manager.anchor_for_parent(blocks[2].recovered_block().hash(), B256::random()),
600            Some(blocks[0].recovered_block().parent_hash())
601        );
602
603        manager.remove_blocks([blocks[0].recovered_block().hash()]);
604        assert_eq!(
605            manager.anchor_for_parent(
606                blocks[2].recovered_block().hash(),
607                blocks[0].recovered_block().hash()
608            ),
609            Some(blocks[0].recovered_block().hash())
610        );
611    }
612
613    #[test]
614    fn prefers_anchor_in_parent_chain() {
615        let manager = StateTrieOverlayManager::default();
616        let blocks = test_blocks();
617        for block in &blocks {
618            manager.insert_block(block.clone());
619        }
620
621        let db_tip_hash = blocks[1].recovered_block().hash();
622        assert_eq!(
623            manager.anchor_for_parent(blocks[2].recovered_block().hash(), db_tip_hash),
624            Some(db_tip_hash)
625        );
626    }
627
628    #[cfg(feature = "rayon")]
629    #[test]
630    fn insert_block_prepares_child_overlay_from_cached_parent() {
631        let manager = StateTrieOverlayManager::new(Arc::new(WorkerPool::new(2, "test-ovly")));
632        let blocks = test_blocks();
633
634        manager.insert_block(blocks[0].clone());
635
636        let anchor_hash = blocks[0].recovered_block().parent_hash();
637        let parent_hash = blocks[0].recovered_block().hash();
638        manager.overlay_for_parent(parent_hash, anchor_hash).unwrap();
639
640        let child_hash = blocks[1].recovered_block().hash();
641        manager.insert_block(blocks[1].clone());
642
643        let child_key = OverlayCacheKey { anchor_hash, tip_hash: child_hash };
644        let deadline = Instant::now() + Duration::from_secs(5);
645        while !manager.overlays.contains_key(&child_key) {
646            assert!(
647                Instant::now() < deadline,
648                "timed out waiting for optimistically prepared child overlay"
649            );
650            thread::sleep(Duration::from_millis(10));
651        }
652
653        let (_, state) = manager.overlay_for_parent(child_hash, anchor_hash).unwrap();
654        assert_eq!(state.accounts.len(), 2);
655    }
656
657    #[test]
658    fn prunes_cached_overlays_after_removing_blocks() {
659        let manager = StateTrieOverlayManager::default();
660        let blocks = test_blocks();
661        for block in &blocks {
662            manager.insert_block(block.clone());
663        }
664
665        let original_anchor = blocks[0].recovered_block().parent_hash();
666        manager.overlay_for_parent(blocks[2].recovered_block().hash(), original_anchor).unwrap();
667
668        manager.remove_blocks([
669            blocks[0].recovered_block().hash(),
670            blocks[1].recovered_block().hash(),
671        ]);
672
673        let anchor_hash = blocks[1].recovered_block().hash();
674        assert!(manager
675            .overlay_for_parent(blocks[2].recovered_block().hash(), original_anchor)
676            .is_err());
677
678        let (_, state) =
679            manager.overlay_for_parent(blocks[2].recovered_block().hash(), anchor_hash).unwrap();
680        assert_eq!(state.accounts.len(), 1);
681    }
682}