1use crate::{EthPrimitives, ExecutedBlock};
8use alloy_primitives::B256;
9use reth_metrics::{
10 metrics::{Counter, Histogram},
11 Metrics,
12};
13use reth_primitives_traits::{
14 dashmap::{mapref::entry::Entry, DashMap},
15 AlloyBlockHeader, NodePrimitives,
16};
17#[cfg(feature = "rayon")]
18use reth_tasks::WorkerPool;
19use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
20use std::{
21 fmt,
22 sync::{Arc, OnceLock},
23 time::Instant,
24};
25use tracing::{debug, trace};
26
27#[derive(Clone)]
32pub struct StateTrieOverlayManager<N: NodePrimitives = EthPrimitives> {
33 blocks: Arc<DashMap<B256, ExecutedBlock<N>>>,
34 overlays: Arc<DashMap<OverlayCacheKey, OverlayCacheEntry>>,
35 #[cfg(feature = "rayon")]
36 worker_pool: Option<Arc<WorkerPool>>,
37 metrics: StateTrieOverlayMetrics,
38}
39
40#[derive(Clone, Metrics)]
42#[metrics(scope = "sync.block_validation.state_trie_overlay")]
43struct StateTrieOverlayMetrics {
44 overlay_computation_duration_seconds: Histogram,
46 overlay_cache_reuses: Counter,
48 overlay_cache_fills: Counter,
50}
51
52impl<N: NodePrimitives> Default for StateTrieOverlayManager<N> {
53 fn default() -> Self {
54 Self {
55 blocks: Default::default(),
56 overlays: Default::default(),
57 #[cfg(feature = "rayon")]
58 worker_pool: None,
59 metrics: Default::default(),
60 }
61 }
62}
63
64impl<N: NodePrimitives> std::fmt::Debug for StateTrieOverlayManager<N> {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("StateTrieOverlayManager")
67 .field("blocks", &self.blocks.len())
68 .field("overlays", &self.overlays.len())
69 .finish()
70 }
71}
72
73impl<N: NodePrimitives> StateTrieOverlayManager<N> {
74 #[cfg(feature = "rayon")]
76 pub fn new(worker_pool: Arc<WorkerPool>) -> Self {
77 Self {
78 blocks: Default::default(),
79 overlays: Default::default(),
80 worker_pool: Some(worker_pool),
81 metrics: Default::default(),
82 }
83 }
84
85 #[tracing::instrument(
87 level = "trace",
88 target = "chain_state::state_trie_overlay",
89 skip_all,
90 fields(
91 block_hash = %block.recovered_block().hash(),
92 parent_hash = %block.recovered_block().parent_hash(),
93 duplicate = false,
94 )
95 )]
96 pub fn insert_block(&self, block: ExecutedBlock<N>) {
97 let hash = block.recovered_block().hash();
98 let parent_hash = block.recovered_block().parent_hash();
99 let span = tracing::Span::current();
100
101 match self.blocks.entry(hash) {
103 Entry::Occupied(_) => {
104 span.record("duplicate", true);
105 debug!(
106 target: "chain_state::state_trie_overlay",
107 %hash,
108 %parent_hash,
109 "state trie overlay block already inserted"
110 );
111 return
112 }
113 Entry::Vacant(entry) => {
114 entry.insert(block);
115 }
116 }
117
118 let cached_parent_overlays = self
121 .overlays
122 .iter()
123 .filter_map(|entry| {
124 let key = *entry.key();
125 (key.tip_hash == parent_hash && entry.value().is_ready()).then_some(key.anchor_hash)
126 })
127 .collect::<Vec<_>>();
128
129 debug!(
130 target: "chain_state::state_trie_overlay",
131 %hash,
132 %parent_hash,
133 "inserted block into state trie overlay manager"
134 );
135 if cached_parent_overlays.is_empty() {
136 return
137 }
138
139 #[cfg(feature = "rayon")]
140 let Some(worker_pool) = self.worker_pool.clone() else {
141 return
142 };
143
144 #[cfg(not(feature = "rayon"))]
145 let _ = cached_parent_overlays;
146
147 #[cfg(feature = "rayon")]
148 {
149 let parent_span = span;
150 for anchor_hash in cached_parent_overlays {
151 let manager = <Self as Clone>::clone(self);
152 let parent_span = parent_span.clone();
153 worker_pool.spawn(move || {
154 let _span = tracing::trace_span!(
155 target: "chain_state::state_trie_overlay",
156 parent: parent_span,
157 "precompute_state_trie_overlay",
158 tip_hash = %hash,
159 anchor_hash = %anchor_hash,
160 )
161 .entered();
162 let _ = manager.precompute_overlay(hash, anchor_hash);
163 });
164 }
165 }
166 }
167
168 #[tracing::instrument(
171 level = "trace",
172 target = "chain_state::state_trie_overlay",
173 skip_all,
174 fields(
175 block_count = tracing::field::Empty,
176 removed_blocks = tracing::field::Empty,
177 pruned_overlays = tracing::field::Empty,
178 )
179 )]
180 pub fn remove_blocks(&self, hashes: impl IntoIterator<Item = B256>) {
181 let span = tracing::Span::current();
182
183 let mut block_count = 0usize;
185 let mut removed_blocks = 0usize;
186 let mut pruned_overlays = 0usize;
187 for hash in hashes {
188 block_count += 1;
189 removed_blocks += self.blocks.remove(&hash).is_some() as usize;
190 }
191 span.record("block_count", block_count);
192 span.record("removed_blocks", removed_blocks);
193
194 if removed_blocks > 0 {
195 let overlays_before = self.overlays.len();
196 let blocks = Arc::clone(&self.blocks);
197 self.overlays.retain(|key, _| {
198 key.tip_hash != key.anchor_hash &&
199 Self::anchor_for_parent_in(blocks.as_ref(), key.tip_hash, key.anchor_hash) ==
200 Some(key.anchor_hash)
201 });
202 pruned_overlays = overlays_before.saturating_sub(self.overlays.len());
203 span.record("pruned_overlays", pruned_overlays);
204 }
205 debug!(
206 target: "chain_state::state_trie_overlay",
207 block_count,
208 removed_blocks,
209 pruned_overlays,
210 "removed blocks from state trie overlay manager"
211 );
212 }
213
214 #[tracing::instrument(
216 level = "trace",
217 target = "chain_state::state_trie_overlay",
218 skip_all,
219 fields(tip_hash = %parent_hash, anchor_hash = %anchor_hash)
220 )]
221 pub fn overlay_for_parent(
222 &self,
223 parent_hash: B256,
224 anchor_hash: B256,
225 ) -> Result<(Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>), StateTrieOverlayError> {
226 debug!(
227 target: "chain_state::state_trie_overlay",
228 tip_hash = %parent_hash,
229 %anchor_hash,
230 "loading state trie overlay for parent"
231 );
232 let input = self.get_overlay(parent_hash, anchor_hash)?;
233 Ok((Arc::clone(&input.nodes), Arc::clone(&input.state)))
234 }
235
236 #[cfg(feature = "rayon")]
237 fn precompute_overlay(
238 &self,
239 tip_hash: B256,
240 anchor_hash: B256,
241 ) -> Result<(), StateTrieOverlayError> {
242 let _ = self.get_overlay_inner(tip_hash, anchor_hash, OverlayLookupMode::Precompute)?;
243 Ok(())
244 }
245
246 #[tracing::instrument(
247 level = "trace",
248 target = "chain_state::state_trie_overlay",
249 skip_all,
250 fields(
251 tip_hash = %tip_hash,
252 anchor_hash = %anchor_hash,
253 cache_reused = tracing::field::Empty,
254 block_count = tracing::field::Empty,
255 parent_overlay_reused = tracing::field::Empty,
256 )
257 )]
258 fn get_overlay(
259 &self,
260 tip_hash: B256,
261 anchor_hash: B256,
262 ) -> Result<Arc<TrieInputSorted>, StateTrieOverlayError> {
263 self.get_overlay_inner(tip_hash, anchor_hash, OverlayLookupMode::Required)
264 .map(|input| input.expect("required overlay lookups always return an overlay"))
265 }
266
267 fn get_overlay_inner(
268 &self,
269 tip_hash: B256,
270 anchor_hash: B256,
271 mode: OverlayLookupMode,
272 ) -> Result<Option<Arc<TrieInputSorted>>, StateTrieOverlayError> {
273 let key = OverlayCacheKey { anchor_hash, tip_hash };
274 let span = tracing::Span::current();
275
276 if let Some(entry) = self.overlays.get(&key).map(|entry| entry.value().clone()) {
277 return Ok(match entry {
278 OverlayCacheEntry::Ready(input) => {
279 self.metrics.overlay_cache_reuses.increment(1);
280 span.record("cache_reused", true);
281 Some(input)
282 }
283 OverlayCacheEntry::Computing(waiter) => {
284 span.record("cache_reused", true);
285 if mode == OverlayLookupMode::Precompute {
286 None
287 } else {
288 self.metrics.overlay_cache_reuses.increment(1);
289 Some(waiter.wait())
290 }
291 }
292 })
293 }
294 span.record("cache_reused", false);
295
296 let mut hash = tip_hash;
298 let mut blocks = Vec::new();
299 loop {
300 let block =
301 self.blocks.get(&hash).ok_or(StateTrieOverlayError { tip_hash, anchor_hash })?;
302 let parent_hash = block.recovered_block().parent_hash();
303 blocks.push(block.clone());
304
305 if parent_hash == anchor_hash {
306 break
307 }
308 hash = parent_hash;
309 }
310 span.record("block_count", blocks.len());
311 let parent_input = blocks.first().and_then(|block| {
312 let parent_hash = block.recovered_block().parent_hash();
313 (parent_hash != anchor_hash)
314 .then(|| {
315 self.overlays
316 .get(&OverlayCacheKey { anchor_hash, tip_hash: parent_hash })
317 .and_then(|entry| entry.value().ready())
318 })
319 .flatten()
320 });
321 span.record("parent_overlay_reused", parent_input.is_some());
322 let compute_input = match parent_input {
323 Some(parent_input) => {
324 ComputeOverlayInput::ExtendCached { block: blocks.swap_remove(0), parent_input }
325 }
326 None => ComputeOverlayInput::MergeBlocks(blocks),
327 };
328
329 enum CacheAction {
330 Ready(Arc<TrieInputSorted>),
331 Wait(Arc<OverlayWaiter>),
332 Compute(Arc<OverlayWaiter>),
333 Skip,
334 }
335
336 let action = match self.overlays.entry(key) {
337 Entry::Occupied(entry) => match entry.get().clone() {
338 OverlayCacheEntry::Ready(input) => {
339 self.metrics.overlay_cache_reuses.increment(1);
340 span.record("cache_reused", true);
341 CacheAction::Ready(input)
342 }
343 OverlayCacheEntry::Computing(waiter) => {
344 span.record("cache_reused", true);
345 if mode == OverlayLookupMode::Precompute {
346 CacheAction::Skip
347 } else {
348 self.metrics.overlay_cache_reuses.increment(1);
349 CacheAction::Wait(waiter)
350 }
351 }
352 },
353 Entry::Vacant(entry) => {
354 self.metrics.overlay_cache_fills.increment(1);
355 let waiter = Arc::new(OverlayWaiter::new());
356 entry.insert(OverlayCacheEntry::Computing(Arc::clone(&waiter)));
357 CacheAction::Compute(waiter)
358 }
359 };
360
361 match action {
362 CacheAction::Ready(input) => Ok(Some(input)),
363 CacheAction::Wait(waiter) => Ok(Some(waiter.wait())),
364 CacheAction::Skip => Ok(None),
365 CacheAction::Compute(waiter) => {
366 let input = self.compute_overlay(compute_input, anchor_hash, span);
367 waiter.finish(Arc::clone(&input));
368
369 if let Entry::Occupied(mut entry) = self.overlays.entry(key) {
370 let should_publish = match entry.get() {
373 OverlayCacheEntry::Computing(existing) => Arc::ptr_eq(existing, &waiter),
374 OverlayCacheEntry::Ready(_) => false,
375 };
376 if should_publish {
377 entry.insert(OverlayCacheEntry::Ready(Arc::clone(&input)));
378 }
379 }
380
381 Ok(Some(input))
382 }
383 }
384 }
385
386 pub fn anchor_for_parent(&self, parent_hash: B256, preferred_anchor: B256) -> Option<B256> {
391 Self::anchor_for_parent_in(self.blocks.as_ref(), parent_hash, preferred_anchor)
392 }
393
394 fn anchor_for_parent_in(
395 blocks: &DashMap<B256, ExecutedBlock<N>>,
396 parent_hash: B256,
397 preferred_anchor: B256,
398 ) -> Option<B256> {
399 if parent_hash == preferred_anchor {
400 return Some(preferred_anchor)
401 }
402
403 let mut hash = parent_hash;
404
405 loop {
406 let block_parent_hash = blocks.get(&hash)?.recovered_block().parent_hash();
407 if block_parent_hash == preferred_anchor {
408 return Some(block_parent_hash)
409 }
410 if !blocks.contains_key(&block_parent_hash) {
411 return Some(block_parent_hash)
412 }
413 hash = block_parent_hash;
414 }
415 }
416
417 fn compute_overlay(
418 &self,
419 compute_input: ComputeOverlayInput<N>,
420 anchor_hash: B256,
421 _span: tracing::Span,
422 ) -> Arc<TrieInputSorted> {
423 #[cfg(feature = "rayon")]
424 {
425 if let Some(worker_pool) = &self.worker_pool {
426 let compute_span = _span;
427 let metrics = self.metrics.clone();
428 return Arc::new(worker_pool.install_fn(move || {
429 let _guard = compute_span.enter();
430 compute_overlay(compute_input, anchor_hash, &metrics)
431 }))
432 }
433 }
434
435 Arc::new(compute_overlay(compute_input, anchor_hash, &self.metrics))
436 }
437}
438
439#[derive(Debug)]
441pub struct StateTrieOverlayError {
442 tip_hash: B256,
444 anchor_hash: B256,
446}
447
448impl fmt::Display for StateTrieOverlayError {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 write!(
451 f,
452 "state trie overlay for tip {} cannot be anchored to {} with current blocks",
453 self.tip_hash, self.anchor_hash
454 )
455 }
456}
457
458impl std::error::Error for StateTrieOverlayError {}
459
460#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
461struct OverlayCacheKey {
462 anchor_hash: B256,
463 tip_hash: B256,
464}
465
466#[derive(Clone)]
467enum OverlayCacheEntry {
468 Ready(Arc<TrieInputSorted>),
469 Computing(Arc<OverlayWaiter>),
470}
471
472impl OverlayCacheEntry {
473 const fn is_ready(&self) -> bool {
474 matches!(self, Self::Ready(_))
475 }
476
477 fn ready(&self) -> Option<Arc<TrieInputSorted>> {
478 match self {
479 Self::Ready(input) => Some(Arc::clone(input)),
480 Self::Computing(_) => None,
481 }
482 }
483}
484
485#[derive(Clone, Copy, Debug, Eq, PartialEq)]
486enum OverlayLookupMode {
487 Required,
488 Precompute,
489}
490
491struct OverlayWaiter {
492 input: OnceLock<Arc<TrieInputSorted>>,
493}
494
495impl OverlayWaiter {
496 const fn new() -> Self {
497 Self { input: OnceLock::new() }
498 }
499
500 fn wait(&self) -> Arc<TrieInputSorted> {
501 Arc::clone(self.input.wait())
502 }
503
504 fn finish(&self, computed: Arc<TrieInputSorted>) {
505 let _ = self.input.set(computed);
506 }
507}
508
509enum ComputeOverlayInput<N: NodePrimitives> {
510 ExtendCached { block: ExecutedBlock<N>, parent_input: Arc<TrieInputSorted> },
511 MergeBlocks(Vec<ExecutedBlock<N>>),
512}
513
514#[tracing::instrument(
515 level = "trace",
516 target = "chain_state::state_trie_overlay",
517 skip_all,
518 fields(
519 anchor_hash = %anchor_hash,
520 block_count = tracing::field::Empty,
521 parent_overlay = tracing::field::Empty,
522 elapsed_us = tracing::field::Empty,
523 )
524)]
525fn compute_overlay<N: NodePrimitives>(
526 input: ComputeOverlayInput<N>,
527 anchor_hash: B256,
528 metrics: &StateTrieOverlayMetrics,
529) -> TrieInputSorted {
530 let started_at = Instant::now();
531 let block_count = match &input {
532 ComputeOverlayInput::ExtendCached { .. } => 1,
533 ComputeOverlayInput::MergeBlocks(blocks) => blocks.len(),
534 };
535 let parent_overlay = matches!(&input, ComputeOverlayInput::ExtendCached { .. });
536 tracing::Span::current().record("block_count", block_count);
537 tracing::Span::current().record("parent_overlay", parent_overlay);
538
539 let overlay = match input {
540 ComputeOverlayInput::ExtendCached { block, parent_input } => {
541 let trie_data = block.trie_data();
542
543 trace!(
544 target: "chain_state::state_trie_overlay",
545 %anchor_hash,
546 head = %block.recovered_block().hash(),
547 "extending cached parent state trie overlay"
548 );
549
550 let mut overlay = parent_input.as_ref().clone();
551 extend_overlay(&mut overlay, &trie_data.hashed_state, &trie_data.trie_updates);
552 overlay
553 }
554 ComputeOverlayInput::MergeBlocks(blocks) => merge_blocks(blocks),
555 };
556
557 let elapsed = started_at.elapsed();
558 metrics.overlay_computation_duration_seconds.record(elapsed.as_secs_f64());
559 tracing::Span::current().record("elapsed_us", elapsed.as_micros() as u64);
560 debug!(
561 target: "chain_state::state_trie_overlay",
562 %anchor_hash,
563 block_count,
564 parent_overlay,
565 ?elapsed,
566 "computed state trie overlay"
567 );
568
569 overlay
570}
571
572fn merge_blocks<N: NodePrimitives>(blocks: Vec<ExecutedBlock<N>>) -> TrieInputSorted {
573 let trie_data = blocks.iter().map(ExecutedBlock::trie_data).collect::<Vec<_>>();
574
575 #[cfg(feature = "rayon")]
576 let (nodes, state) = rayon::join(
577 || {
578 TrieUpdatesSorted::merge_batch(
579 trie_data.iter().map(|data| Arc::clone(&data.trie_updates)),
580 )
581 },
582 || {
583 HashedPostStateSorted::merge_batch(
584 trie_data.iter().map(|data| Arc::clone(&data.hashed_state)),
585 )
586 },
587 );
588
589 #[cfg(not(feature = "rayon"))]
590 let (nodes, state) = (
591 TrieUpdatesSorted::merge_batch(trie_data.iter().map(|data| Arc::clone(&data.trie_updates))),
592 HashedPostStateSorted::merge_batch(
593 trie_data.iter().map(|data| Arc::clone(&data.hashed_state)),
594 ),
595 );
596
597 TrieInputSorted::new(nodes, state, Default::default())
598}
599
600fn extend_overlay(
601 overlay: &mut TrieInputSorted,
602 hashed_state: &HashedPostStateSorted,
603 trie_updates: &TrieUpdatesSorted,
604) {
605 #[cfg(feature = "rayon")]
606 {
607 rayon::join(
608 || {
609 if !hashed_state.is_empty() {
610 Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state);
611 }
612 },
613 || {
614 if !trie_updates.is_empty() {
615 Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(trie_updates);
616 }
617 },
618 );
619 }
620
621 #[cfg(not(feature = "rayon"))]
622 {
623 if !hashed_state.is_empty() {
624 Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state);
625 }
626 if !trie_updates.is_empty() {
627 Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(trie_updates);
628 }
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use crate::{test_utils::TestBlockBuilder, ComputedTrieData, EthPrimitives, ExecutedBlock};
636 use alloy_primitives::U256;
637 use reth_primitives_traits::Account;
638 use reth_trie::{updates::TrieUpdatesSorted, HashedPostState, HashedStorage};
639 #[cfg(feature = "rayon")]
640 use std::time::Instant;
641 use std::{
642 sync::{mpsc, Arc},
643 thread,
644 time::Duration,
645 };
646
647 fn with_unique_state(
648 block: &ExecutedBlock<EthPrimitives>,
649 id: u8,
650 ) -> ExecutedBlock<EthPrimitives> {
651 let hashed_address = B256::with_last_byte(id);
652 let hashed_slot = B256::with_last_byte(id.saturating_add(32));
653 let hashed_state = HashedPostState::default()
654 .with_accounts([(hashed_address, Some(Account::default()))])
655 .with_storages([(
656 hashed_address,
657 HashedStorage::from_iter(false, [(hashed_slot, U256::from(id))]),
658 )])
659 .into_sorted();
660
661 ExecutedBlock::new(
662 Arc::clone(&block.recovered_block),
663 Arc::clone(&block.execution_output),
664 ComputedTrieData::new(Arc::new(hashed_state), Arc::new(TrieUpdatesSorted::default())),
665 )
666 }
667
668 fn test_blocks() -> Vec<ExecutedBlock<EthPrimitives>> {
669 TestBlockBuilder::eth()
670 .get_executed_blocks(1..4)
671 .enumerate()
672 .map(|(index, block)| with_unique_state(&block, index as u8 + 1))
673 .collect()
674 }
675
676 #[test]
677 fn errors_for_unknown_parent() {
678 let manager = StateTrieOverlayManager::<EthPrimitives>::default();
679 let parent = B256::random();
680 let anchor = B256::random();
681
682 let err = manager.overlay_for_parent(parent, anchor).unwrap_err();
683
684 assert_eq!(err.tip_hash, parent);
685 assert_eq!(err.anchor_hash, anchor);
686 }
687
688 #[test]
689 fn builds_managed_overlay_for_inserted_blocks() {
690 let manager = StateTrieOverlayManager::default();
691 let blocks = test_blocks();
692 for block in &blocks {
693 manager.insert_block(block.clone());
694 }
695
696 let anchor_hash = blocks[0].recovered_block().parent_hash();
697
698 let (_, state) =
699 manager.overlay_for_parent(blocks[2].recovered_block().hash(), anchor_hash).unwrap();
700 assert_eq!(state.accounts.len(), 3);
701
702 let short_anchor = blocks[1].recovered_block().hash();
703 let (_, short) =
704 manager.overlay_for_parent(blocks[2].recovered_block().hash(), short_anchor).unwrap();
705 assert_eq!(short.accounts.len(), 1);
706 let (_, cached_short) =
707 manager.overlay_for_parent(blocks[2].recovered_block().hash(), short_anchor).unwrap();
708 assert!(Arc::ptr_eq(&short, &cached_short));
709 }
710
711 #[test]
712 fn returns_anchor_for_in_memory_parent() {
713 let manager = StateTrieOverlayManager::default();
714 let blocks = test_blocks();
715 for block in &blocks {
716 manager.insert_block(block.clone());
717 }
718
719 assert_eq!(
720 manager.anchor_for_parent(blocks[2].recovered_block().hash(), B256::random()),
721 Some(blocks[0].recovered_block().parent_hash())
722 );
723
724 manager.remove_blocks([blocks[0].recovered_block().hash()]);
725 assert_eq!(
726 manager.anchor_for_parent(
727 blocks[2].recovered_block().hash(),
728 blocks[0].recovered_block().hash()
729 ),
730 Some(blocks[0].recovered_block().hash())
731 );
732 }
733
734 #[test]
735 fn prefers_anchor_in_parent_chain() {
736 let manager = StateTrieOverlayManager::default();
737 let blocks = test_blocks();
738 for block in &blocks {
739 manager.insert_block(block.clone());
740 }
741
742 let db_tip_hash = blocks[1].recovered_block().hash();
743 assert_eq!(
744 manager.anchor_for_parent(blocks[2].recovered_block().hash(), db_tip_hash),
745 Some(db_tip_hash)
746 );
747 }
748
749 #[test]
750 fn required_lookup_waits_for_in_progress_overlay() {
751 let manager = StateTrieOverlayManager::<EthPrimitives>::default();
752 let key = OverlayCacheKey {
753 anchor_hash: B256::with_last_byte(1),
754 tip_hash: B256::with_last_byte(2),
755 };
756 let waiter = Arc::new(OverlayWaiter::new());
757 manager.overlays.insert(key, OverlayCacheEntry::Computing(Arc::clone(&waiter)));
758
759 let (tx, rx) = mpsc::channel();
760 thread::spawn(move || {
761 let res =
762 manager.overlay_for_parent(key.tip_hash, key.anchor_hash).map(|(_, state)| state);
763 tx.send(res).unwrap();
764 });
765
766 assert!(matches!(
767 rx.recv_timeout(Duration::from_millis(50)),
768 Err(mpsc::RecvTimeoutError::Timeout)
769 ));
770
771 waiter.finish(Arc::new(TrieInputSorted::default()));
772
773 let state = rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
774 assert!(state.is_empty());
775 }
776
777 #[cfg(feature = "rayon")]
778 #[test]
779 fn precompute_skips_in_progress_overlay() {
780 let manager = StateTrieOverlayManager::<EthPrimitives>::new(Arc::new(WorkerPool::new(
781 1,
782 "test-ovly",
783 )));
784 let key = OverlayCacheKey {
785 anchor_hash: B256::with_last_byte(1),
786 tip_hash: B256::with_last_byte(2),
787 };
788 manager.overlays.insert(key, OverlayCacheEntry::Computing(Arc::new(OverlayWaiter::new())));
789
790 let (tx, rx) = mpsc::channel();
791 thread::spawn(move || {
792 tx.send(manager.precompute_overlay(key.tip_hash, key.anchor_hash)).unwrap();
793 });
794
795 rx.recv_timeout(Duration::from_secs(1)).unwrap().unwrap();
796 }
797
798 #[cfg(feature = "rayon")]
799 #[test]
800 fn insert_block_prepares_child_overlay_from_cached_parent() {
801 let manager = StateTrieOverlayManager::new(Arc::new(WorkerPool::new(2, "test-ovly")));
802 let blocks = test_blocks();
803
804 manager.insert_block(blocks[0].clone());
805
806 let anchor_hash = blocks[0].recovered_block().parent_hash();
807 let parent_hash = blocks[0].recovered_block().hash();
808 manager.overlay_for_parent(parent_hash, anchor_hash).unwrap();
809
810 let child_hash = blocks[1].recovered_block().hash();
811 manager.insert_block(blocks[1].clone());
812
813 let child_key = OverlayCacheKey { anchor_hash, tip_hash: child_hash };
814 let deadline = Instant::now() + Duration::from_secs(5);
815 while !manager.overlays.contains_key(&child_key) {
816 assert!(
817 Instant::now() < deadline,
818 "timed out waiting for optimistically prepared child overlay"
819 );
820 thread::sleep(Duration::from_millis(10));
821 }
822
823 let (_, state) = manager.overlay_for_parent(child_hash, anchor_hash).unwrap();
824 assert_eq!(state.accounts.len(), 2);
825 }
826
827 #[test]
828 fn prunes_cached_overlays_after_removing_blocks() {
829 let manager = StateTrieOverlayManager::default();
830 let blocks = test_blocks();
831 for block in &blocks {
832 manager.insert_block(block.clone());
833 }
834
835 let original_anchor = blocks[0].recovered_block().parent_hash();
836 manager.overlay_for_parent(blocks[2].recovered_block().hash(), original_anchor).unwrap();
837
838 manager.remove_blocks([
839 blocks[0].recovered_block().hash(),
840 blocks[1].recovered_block().hash(),
841 ]);
842
843 let anchor_hash = blocks[1].recovered_block().hash();
844 assert!(manager
845 .overlay_for_parent(blocks[2].recovered_block().hash(), original_anchor)
846 .is_err());
847
848 let (_, state) =
849 manager.overlay_for_parent(blocks[2].recovered_block().hash(), anchor_hash).unwrap();
850 assert_eq!(state.accounts.len(), 1);
851 }
852}