1use crate::{EthPrimitives, ExecutedBlock};
8use alloy_primitives::B256;
9use reth_metrics::{
10 metrics::{Counter, Histogram},
11 Metrics,
12};
13use reth_primitives_traits::{
14 dashmap::{mapref::entry::Entry, DashMap},
15 AlloyBlockHeader, NodePrimitives,
16};
17#[cfg(feature = "rayon")]
18use reth_tasks::WorkerPool;
19use reth_trie::{updates::TrieUpdatesSorted, HashedPostStateSorted, TrieInputSorted};
20use std::{fmt, sync::Arc, time::Instant};
21use tracing::{debug, trace};
22
23#[derive(Clone)]
28pub struct StateTrieOverlayManager<N: NodePrimitives = EthPrimitives> {
29 blocks: Arc<DashMap<B256, ExecutedBlock<N>>>,
30 overlays: Arc<DashMap<OverlayCacheKey, Arc<TrieInputSorted>>>,
31 #[cfg(feature = "rayon")]
32 worker_pool: Option<Arc<WorkerPool>>,
33 metrics: StateTrieOverlayMetrics,
34}
35
36#[derive(Clone, Metrics)]
38#[metrics(scope = "sync.block_validation.state_trie_overlay")]
39struct StateTrieOverlayMetrics {
40 overlay_computation_duration_seconds: Histogram,
42 overlay_cache_reuses: Counter,
44 overlay_cache_fills: Counter,
46}
47
48impl<N: NodePrimitives> Default for StateTrieOverlayManager<N> {
49 fn default() -> Self {
50 Self {
51 blocks: Default::default(),
52 overlays: Default::default(),
53 #[cfg(feature = "rayon")]
54 worker_pool: None,
55 metrics: Default::default(),
56 }
57 }
58}
59
60impl<N: NodePrimitives> std::fmt::Debug for StateTrieOverlayManager<N> {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("StateTrieOverlayManager")
63 .field("blocks", &self.blocks.len())
64 .field("overlays", &self.overlays.len())
65 .finish()
66 }
67}
68
69impl<N: NodePrimitives> StateTrieOverlayManager<N> {
70 #[cfg(feature = "rayon")]
72 pub fn new(worker_pool: Arc<WorkerPool>) -> Self {
73 Self {
74 blocks: Default::default(),
75 overlays: Default::default(),
76 worker_pool: Some(worker_pool),
77 metrics: Default::default(),
78 }
79 }
80
81 #[tracing::instrument(
83 level = "trace",
84 target = "chain_state::state_trie_overlay",
85 skip_all,
86 fields(
87 block_hash = %block.recovered_block().hash(),
88 parent_hash = %block.recovered_block().parent_hash(),
89 duplicate = false,
90 )
91 )]
92 pub fn insert_block(&self, block: ExecutedBlock<N>) {
93 let hash = block.recovered_block().hash();
94 let parent_hash = block.recovered_block().parent_hash();
95 let span = tracing::Span::current();
96
97 match self.blocks.entry(hash) {
99 Entry::Occupied(_) => {
100 span.record("duplicate", true);
101 debug!(
102 target: "chain_state::state_trie_overlay",
103 %hash,
104 %parent_hash,
105 "state trie overlay block already inserted"
106 );
107 return
108 }
109 Entry::Vacant(entry) => {
110 entry.insert(block);
111 }
112 }
113
114 let cached_parent_overlays = self
117 .overlays
118 .iter()
119 .filter_map(|entry| {
120 let key = *entry.key();
121 (key.tip_hash == parent_hash).then_some(key.anchor_hash)
122 })
123 .collect::<Vec<_>>();
124
125 debug!(
126 target: "chain_state::state_trie_overlay",
127 %hash,
128 %parent_hash,
129 "inserted block into state trie overlay manager"
130 );
131 if cached_parent_overlays.is_empty() {
132 return
133 }
134
135 #[cfg(feature = "rayon")]
136 let Some(worker_pool) = self.worker_pool.clone() else {
137 return
138 };
139
140 #[cfg(not(feature = "rayon"))]
141 let _ = cached_parent_overlays;
142
143 #[cfg(feature = "rayon")]
144 {
145 let parent_span = span;
146 for anchor_hash in cached_parent_overlays {
147 let manager = <Self as Clone>::clone(self);
148 let parent_span = parent_span.clone();
149 worker_pool.spawn(move || {
150 let _span = tracing::trace_span!(
151 target: "chain_state::state_trie_overlay",
152 parent: parent_span,
153 "precompute_state_trie_overlay",
154 tip_hash = %hash,
155 anchor_hash = %anchor_hash,
156 )
157 .entered();
158 let _ = manager.get_overlay(hash, anchor_hash);
159 });
160 }
161 }
162 }
163
164 #[tracing::instrument(
167 level = "trace",
168 target = "chain_state::state_trie_overlay",
169 skip_all,
170 fields(
171 block_count = tracing::field::Empty,
172 removed_blocks = tracing::field::Empty,
173 pruned_overlays = tracing::field::Empty,
174 )
175 )]
176 pub fn remove_blocks(&self, hashes: impl IntoIterator<Item = B256>) {
177 let span = tracing::Span::current();
178
179 let mut block_count = 0usize;
181 let mut removed_blocks = 0usize;
182 let mut pruned_overlays = 0usize;
183 for hash in hashes {
184 block_count += 1;
185 removed_blocks += self.blocks.remove(&hash).is_some() as usize;
186 }
187 span.record("block_count", block_count);
188 span.record("removed_blocks", removed_blocks);
189
190 if removed_blocks > 0 {
191 let overlays_before = self.overlays.len();
192 let blocks = Arc::clone(&self.blocks);
193 self.overlays.retain(|key, _| {
194 key.tip_hash != key.anchor_hash &&
195 Self::anchor_for_parent_in(blocks.as_ref(), key.tip_hash, key.anchor_hash) ==
196 Some(key.anchor_hash)
197 });
198 pruned_overlays = overlays_before.saturating_sub(self.overlays.len());
199 span.record("pruned_overlays", pruned_overlays);
200 }
201 debug!(
202 target: "chain_state::state_trie_overlay",
203 block_count,
204 removed_blocks,
205 pruned_overlays,
206 "removed blocks from state trie overlay manager"
207 );
208 }
209
210 #[tracing::instrument(
212 level = "trace",
213 target = "chain_state::state_trie_overlay",
214 skip_all,
215 fields(tip_hash = %parent_hash, anchor_hash = %anchor_hash)
216 )]
217 pub fn overlay_for_parent(
218 &self,
219 parent_hash: B256,
220 anchor_hash: B256,
221 ) -> Result<(Arc<TrieUpdatesSorted>, Arc<HashedPostStateSorted>), StateTrieOverlayError> {
222 debug!(
223 target: "chain_state::state_trie_overlay",
224 tip_hash = %parent_hash,
225 %anchor_hash,
226 "loading state trie overlay for parent"
227 );
228 let input = self.get_overlay(parent_hash, anchor_hash)?;
229 Ok((Arc::clone(&input.nodes), Arc::clone(&input.state)))
230 }
231
232 #[tracing::instrument(
233 level = "trace",
234 target = "chain_state::state_trie_overlay",
235 skip_all,
236 fields(
237 tip_hash = %tip_hash,
238 anchor_hash = %anchor_hash,
239 cache_reused = tracing::field::Empty,
240 block_count = tracing::field::Empty,
241 parent_overlay_reused = tracing::field::Empty,
242 )
243 )]
244 fn get_overlay(
245 &self,
246 tip_hash: B256,
247 anchor_hash: B256,
248 ) -> Result<Arc<TrieInputSorted>, StateTrieOverlayError> {
249 let key = OverlayCacheKey { anchor_hash, tip_hash };
250 let span = tracing::Span::current();
251
252 if let Some(input) = self.overlays.get(&key).map(|entry| Arc::clone(entry.value())) {
253 self.metrics.overlay_cache_reuses.increment(1);
254 span.record("cache_reused", true);
255 return Ok(input)
256 }
257 span.record("cache_reused", false);
258
259 let mut hash = tip_hash;
261 let mut blocks = Vec::new();
262 loop {
263 let block =
264 self.blocks.get(&hash).ok_or(StateTrieOverlayError { tip_hash, anchor_hash })?;
265 let parent_hash = block.recovered_block().parent_hash();
266 blocks.push(block.clone());
267
268 if parent_hash == anchor_hash {
269 break
270 }
271 hash = parent_hash;
272 }
273 span.record("block_count", blocks.len());
274 let parent_input = blocks.first().and_then(|block| {
275 let parent_hash = block.recovered_block().parent_hash();
276 (parent_hash != anchor_hash)
277 .then(|| {
278 self.overlays
279 .get(&OverlayCacheKey { anchor_hash, tip_hash: parent_hash })
280 .map(|entry| Arc::clone(entry.value()))
281 })
282 .flatten()
283 });
284 span.record("parent_overlay_reused", parent_input.is_some());
285 let compute_input = match parent_input {
286 Some(parent_input) => {
287 ComputeOverlayInput::ExtendCached { block: blocks.swap_remove(0), parent_input }
288 }
289 None => ComputeOverlayInput::MergeBlocks(blocks),
290 };
291
292 let input = match self.overlays.entry(key) {
294 Entry::Occupied(entry) => {
295 self.metrics.overlay_cache_reuses.increment(1);
296 span.record("cache_reused", true);
297 return Ok(Arc::clone(entry.get()))
298 }
299 Entry::Vacant(entry) => {
300 self.metrics.overlay_cache_fills.increment(1);
301 let input = {
302 #[cfg(feature = "rayon")]
303 {
304 if let Some(worker_pool) = &self.worker_pool {
305 let compute_span = span;
306 let metrics = self.metrics.clone();
307 Arc::new(worker_pool.install_fn(move || {
308 let _guard = compute_span.enter();
309 compute_overlay(compute_input, anchor_hash, &metrics)
310 }))
311 } else {
312 Arc::new(compute_overlay(compute_input, anchor_hash, &self.metrics))
313 }
314 }
315
316 #[cfg(not(feature = "rayon"))]
317 {
318 Arc::new(compute_overlay(compute_input, anchor_hash, &self.metrics))
319 }
320 };
321
322 entry.insert(Arc::clone(&input));
323 input
324 }
325 };
326
327 Ok(input)
328 }
329
330 pub fn anchor_for_parent(&self, parent_hash: B256, preferred_anchor: B256) -> Option<B256> {
335 Self::anchor_for_parent_in(self.blocks.as_ref(), parent_hash, preferred_anchor)
336 }
337
338 fn anchor_for_parent_in(
339 blocks: &DashMap<B256, ExecutedBlock<N>>,
340 parent_hash: B256,
341 preferred_anchor: B256,
342 ) -> Option<B256> {
343 if parent_hash == preferred_anchor {
344 return Some(preferred_anchor)
345 }
346
347 let mut hash = parent_hash;
348
349 loop {
350 let block_parent_hash = blocks.get(&hash)?.recovered_block().parent_hash();
351 if block_parent_hash == preferred_anchor {
352 return Some(block_parent_hash)
353 }
354 if !blocks.contains_key(&block_parent_hash) {
355 return Some(block_parent_hash)
356 }
357 hash = block_parent_hash;
358 }
359 }
360}
361
362#[derive(Debug)]
364pub struct StateTrieOverlayError {
365 tip_hash: B256,
367 anchor_hash: B256,
369}
370
371impl fmt::Display for StateTrieOverlayError {
372 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
373 write!(
374 f,
375 "state trie overlay for tip {} cannot be anchored to {} with current blocks",
376 self.tip_hash, self.anchor_hash
377 )
378 }
379}
380
381impl std::error::Error for StateTrieOverlayError {}
382
383#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
384struct OverlayCacheKey {
385 anchor_hash: B256,
386 tip_hash: B256,
387}
388
389enum ComputeOverlayInput<N: NodePrimitives> {
390 ExtendCached { block: ExecutedBlock<N>, parent_input: Arc<TrieInputSorted> },
391 MergeBlocks(Vec<ExecutedBlock<N>>),
392}
393
394#[tracing::instrument(
395 level = "trace",
396 target = "chain_state::state_trie_overlay",
397 skip_all,
398 fields(
399 anchor_hash = %anchor_hash,
400 block_count = tracing::field::Empty,
401 parent_overlay = tracing::field::Empty,
402 elapsed_us = tracing::field::Empty,
403 )
404)]
405fn compute_overlay<N: NodePrimitives>(
406 input: ComputeOverlayInput<N>,
407 anchor_hash: B256,
408 metrics: &StateTrieOverlayMetrics,
409) -> TrieInputSorted {
410 let started_at = Instant::now();
411 let block_count = match &input {
412 ComputeOverlayInput::ExtendCached { .. } => 1,
413 ComputeOverlayInput::MergeBlocks(blocks) => blocks.len(),
414 };
415 let parent_overlay = matches!(&input, ComputeOverlayInput::ExtendCached { .. });
416 tracing::Span::current().record("block_count", block_count);
417 tracing::Span::current().record("parent_overlay", parent_overlay);
418
419 let overlay = match input {
420 ComputeOverlayInput::ExtendCached { block, parent_input } => {
421 let trie_data = block.trie_data();
422
423 trace!(
424 target: "chain_state::state_trie_overlay",
425 %anchor_hash,
426 head = %block.recovered_block().hash(),
427 "extending cached parent state trie overlay"
428 );
429
430 let mut overlay = parent_input.as_ref().clone();
431 extend_overlay(&mut overlay, &trie_data.hashed_state, &trie_data.trie_updates);
432 overlay
433 }
434 ComputeOverlayInput::MergeBlocks(blocks) => merge_blocks(blocks),
435 };
436
437 let elapsed = started_at.elapsed();
438 metrics.overlay_computation_duration_seconds.record(elapsed.as_secs_f64());
439 tracing::Span::current().record("elapsed_us", elapsed.as_micros() as u64);
440 debug!(
441 target: "chain_state::state_trie_overlay",
442 %anchor_hash,
443 block_count,
444 parent_overlay,
445 ?elapsed,
446 "computed state trie overlay"
447 );
448
449 overlay
450}
451
452fn merge_blocks<N: NodePrimitives>(blocks: Vec<ExecutedBlock<N>>) -> TrieInputSorted {
453 let trie_data = blocks.iter().map(ExecutedBlock::trie_data).collect::<Vec<_>>();
454
455 #[cfg(feature = "rayon")]
456 let (nodes, state) = rayon::join(
457 || {
458 TrieUpdatesSorted::merge_batch(
459 trie_data.iter().map(|data| Arc::clone(&data.trie_updates)),
460 )
461 },
462 || {
463 HashedPostStateSorted::merge_batch(
464 trie_data.iter().map(|data| Arc::clone(&data.hashed_state)),
465 )
466 },
467 );
468
469 #[cfg(not(feature = "rayon"))]
470 let (nodes, state) = (
471 TrieUpdatesSorted::merge_batch(trie_data.iter().map(|data| Arc::clone(&data.trie_updates))),
472 HashedPostStateSorted::merge_batch(
473 trie_data.iter().map(|data| Arc::clone(&data.hashed_state)),
474 ),
475 );
476
477 TrieInputSorted::new(nodes, state, Default::default())
478}
479
480fn extend_overlay(
481 overlay: &mut TrieInputSorted,
482 hashed_state: &HashedPostStateSorted,
483 trie_updates: &TrieUpdatesSorted,
484) {
485 #[cfg(feature = "rayon")]
486 {
487 rayon::join(
488 || {
489 if !hashed_state.is_empty() {
490 Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state);
491 }
492 },
493 || {
494 if !trie_updates.is_empty() {
495 Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(trie_updates);
496 }
497 },
498 );
499 }
500
501 #[cfg(not(feature = "rayon"))]
502 {
503 if !hashed_state.is_empty() {
504 Arc::make_mut(&mut overlay.state).extend_ref_and_sort(hashed_state);
505 }
506 if !trie_updates.is_empty() {
507 Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(trie_updates);
508 }
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use crate::{test_utils::TestBlockBuilder, ComputedTrieData, EthPrimitives, ExecutedBlock};
516 use alloy_primitives::U256;
517 use reth_primitives_traits::Account;
518 use reth_trie::{updates::TrieUpdatesSorted, HashedPostState, HashedStorage};
519 use std::sync::Arc;
520 #[cfg(feature = "rayon")]
521 use std::{
522 thread,
523 time::{Duration, Instant},
524 };
525
526 fn with_unique_state(
527 block: &ExecutedBlock<EthPrimitives>,
528 id: u8,
529 ) -> ExecutedBlock<EthPrimitives> {
530 let hashed_address = B256::with_last_byte(id);
531 let hashed_slot = B256::with_last_byte(id.saturating_add(32));
532 let hashed_state = HashedPostState::default()
533 .with_accounts([(hashed_address, Some(Account::default()))])
534 .with_storages([(
535 hashed_address,
536 HashedStorage::from_iter(false, [(hashed_slot, U256::from(id))]),
537 )])
538 .into_sorted();
539
540 ExecutedBlock::new(
541 Arc::clone(&block.recovered_block),
542 Arc::clone(&block.execution_output),
543 ComputedTrieData::new(Arc::new(hashed_state), Arc::new(TrieUpdatesSorted::default())),
544 )
545 }
546
547 fn test_blocks() -> Vec<ExecutedBlock<EthPrimitives>> {
548 TestBlockBuilder::eth()
549 .get_executed_blocks(1..4)
550 .enumerate()
551 .map(|(index, block)| with_unique_state(&block, index as u8 + 1))
552 .collect()
553 }
554
555 #[test]
556 fn errors_for_unknown_parent() {
557 let manager = StateTrieOverlayManager::<EthPrimitives>::default();
558 let parent = B256::random();
559 let anchor = B256::random();
560
561 let err = manager.overlay_for_parent(parent, anchor).unwrap_err();
562
563 assert_eq!(err.tip_hash, parent);
564 assert_eq!(err.anchor_hash, anchor);
565 }
566
567 #[test]
568 fn builds_managed_overlay_for_inserted_blocks() {
569 let manager = StateTrieOverlayManager::default();
570 let blocks = test_blocks();
571 for block in &blocks {
572 manager.insert_block(block.clone());
573 }
574
575 let anchor_hash = blocks[0].recovered_block().parent_hash();
576
577 let (_, state) =
578 manager.overlay_for_parent(blocks[2].recovered_block().hash(), anchor_hash).unwrap();
579 assert_eq!(state.accounts.len(), 3);
580
581 let short_anchor = blocks[1].recovered_block().hash();
582 let (_, short) =
583 manager.overlay_for_parent(blocks[2].recovered_block().hash(), short_anchor).unwrap();
584 assert_eq!(short.accounts.len(), 1);
585 let (_, cached_short) =
586 manager.overlay_for_parent(blocks[2].recovered_block().hash(), short_anchor).unwrap();
587 assert!(Arc::ptr_eq(&short, &cached_short));
588 }
589
590 #[test]
591 fn returns_anchor_for_in_memory_parent() {
592 let manager = StateTrieOverlayManager::default();
593 let blocks = test_blocks();
594 for block in &blocks {
595 manager.insert_block(block.clone());
596 }
597
598 assert_eq!(
599 manager.anchor_for_parent(blocks[2].recovered_block().hash(), B256::random()),
600 Some(blocks[0].recovered_block().parent_hash())
601 );
602
603 manager.remove_blocks([blocks[0].recovered_block().hash()]);
604 assert_eq!(
605 manager.anchor_for_parent(
606 blocks[2].recovered_block().hash(),
607 blocks[0].recovered_block().hash()
608 ),
609 Some(blocks[0].recovered_block().hash())
610 );
611 }
612
613 #[test]
614 fn prefers_anchor_in_parent_chain() {
615 let manager = StateTrieOverlayManager::default();
616 let blocks = test_blocks();
617 for block in &blocks {
618 manager.insert_block(block.clone());
619 }
620
621 let db_tip_hash = blocks[1].recovered_block().hash();
622 assert_eq!(
623 manager.anchor_for_parent(blocks[2].recovered_block().hash(), db_tip_hash),
624 Some(db_tip_hash)
625 );
626 }
627
628 #[cfg(feature = "rayon")]
629 #[test]
630 fn insert_block_prepares_child_overlay_from_cached_parent() {
631 let manager = StateTrieOverlayManager::new(Arc::new(WorkerPool::new(2, "test-ovly")));
632 let blocks = test_blocks();
633
634 manager.insert_block(blocks[0].clone());
635
636 let anchor_hash = blocks[0].recovered_block().parent_hash();
637 let parent_hash = blocks[0].recovered_block().hash();
638 manager.overlay_for_parent(parent_hash, anchor_hash).unwrap();
639
640 let child_hash = blocks[1].recovered_block().hash();
641 manager.insert_block(blocks[1].clone());
642
643 let child_key = OverlayCacheKey { anchor_hash, tip_hash: child_hash };
644 let deadline = Instant::now() + Duration::from_secs(5);
645 while !manager.overlays.contains_key(&child_key) {
646 assert!(
647 Instant::now() < deadline,
648 "timed out waiting for optimistically prepared child overlay"
649 );
650 thread::sleep(Duration::from_millis(10));
651 }
652
653 let (_, state) = manager.overlay_for_parent(child_hash, anchor_hash).unwrap();
654 assert_eq!(state.accounts.len(), 2);
655 }
656
657 #[test]
658 fn prunes_cached_overlays_after_removing_blocks() {
659 let manager = StateTrieOverlayManager::default();
660 let blocks = test_blocks();
661 for block in &blocks {
662 manager.insert_block(block.clone());
663 }
664
665 let original_anchor = blocks[0].recovered_block().parent_hash();
666 manager.overlay_for_parent(blocks[2].recovered_block().hash(), original_anchor).unwrap();
667
668 manager.remove_blocks([
669 blocks[0].recovered_block().hash(),
670 blocks[1].recovered_block().hash(),
671 ]);
672
673 let anchor_hash = blocks[1].recovered_block().hash();
674 assert!(manager
675 .overlay_for_parent(blocks[2].recovered_block().hash(), original_anchor)
676 .is_err());
677
678 let (_, state) =
679 manager.overlay_for_parent(blocks[2].recovered_block().hash(), anchor_hash).unwrap();
680 assert_eq!(state.accounts.len(), 1);
681 }
682}