1use crate::{DatabaseHashedPostState, DatabaseStateRoot, DatabaseTrieCursorFactory};
11use alloy_primitives::{map::B256Map, BlockNumber, B256};
12use parking_lot::RwLock;
13use reth_storage_api::{
14 BlockNumReader, ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader,
15};
16use reth_storage_errors::provider::{ProviderError, ProviderResult};
17use reth_trie::{
18 changesets::compute_trie_changesets,
19 trie_cursor::{InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory},
20 HashedPostStateSorted, KeccakKeyHasher, StateRoot, TrieInputSorted,
21};
22use reth_trie_common::updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted};
23use std::{
24 collections::{BTreeMap, HashMap},
25 ops::RangeInclusive,
26 sync::Arc,
27 time::Instant,
28};
29use tracing::debug;
30
31#[cfg(feature = "metrics")]
32use reth_metrics::{
33 metrics::{Counter, Gauge},
34 Metrics,
35};
36
37pub fn compute_block_trie_changesets<Provider>(
66 provider: &Provider,
67 block_number: BlockNumber,
68) -> Result<TrieUpdatesSorted, ProviderError>
69where
70 Provider: DBProvider
71 + StageCheckpointReader
72 + ChangeSetReader
73 + StorageChangeSetReader
74 + BlockNumReader,
75{
76 debug!(
77 target: "trie::changeset_cache",
78 block_number,
79 "Computing block trie changesets from database state"
80 );
81
82 let individual_state_revert = HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(
86 provider,
87 block_number..=block_number,
88 )?;
89
90 let cumulative_state_revert =
92 HashedPostStateSorted::from_reverts::<KeccakKeyHasher>(provider, (block_number + 1)..)?;
93
94 let mut cumulative_state_revert_prev = cumulative_state_revert.clone();
96 cumulative_state_revert_prev.extend_ref_and_sort(&individual_state_revert);
97
98 let prefix_sets_prev = cumulative_state_revert_prev.construct_prefix_sets();
101 let input_prev = TrieInputSorted::new(
102 Arc::default(),
103 Arc::new(cumulative_state_revert_prev),
104 prefix_sets_prev,
105 );
106
107 let cumulative_trie_updates_prev =
108 StateRoot::overlay_root_from_nodes_with_updates(provider.tx_ref(), input_prev)
109 .map_err(ProviderError::other)?
110 .1
111 .into_sorted();
112
113 let prefix_sets = individual_state_revert.construct_prefix_sets();
115
116 let input = TrieInputSorted::new(
119 Arc::new(cumulative_trie_updates_prev.clone()),
120 Arc::new(cumulative_state_revert),
121 prefix_sets,
122 );
123
124 let trie_updates = StateRoot::overlay_root_from_nodes_with_updates(provider.tx_ref(), input)
125 .map_err(ProviderError::other)?
126 .1
127 .into_sorted();
128
129 let db_cursor_factory = DatabaseTrieCursorFactory::new(provider.tx_ref());
132 let overlay_factory =
133 InMemoryTrieCursorFactory::new(db_cursor_factory, &cumulative_trie_updates_prev);
134
135 let changesets =
136 compute_trie_changesets(&overlay_factory, &trie_updates).map_err(ProviderError::other)?;
137
138 debug!(
139 target: "trie::changeset_cache",
140 block_number,
141 num_account_nodes = changesets.account_nodes_ref().len(),
142 num_storage_tries = changesets.storage_tries_ref().len(),
143 "Computed block trie changesets successfully"
144 );
145
146 Ok(changesets)
147}
148
149pub fn compute_block_trie_updates<Provider>(
179 cache: &ChangesetCache,
180 provider: &Provider,
181 block_number: BlockNumber,
182) -> ProviderResult<TrieUpdatesSorted>
183where
184 Provider: DBProvider
185 + StageCheckpointReader
186 + ChangeSetReader
187 + StorageChangeSetReader
188 + BlockNumReader,
189{
190 let tx = provider.tx_ref();
191
192 let db_tip_block = provider
194 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
195 .as_ref()
196 .map(|chk| chk.block_number)
197 .ok_or_else(|| ProviderError::InsufficientChangesets {
198 requested: block_number,
199 available: 0..=0,
200 })?;
201
202 let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
204 ProviderError::other(std::io::Error::new(
205 std::io::ErrorKind::NotFound,
206 format!("block hash not found for block number {}", block_number),
207 ))
208 })?;
209
210 let changesets = cache.get_or_compute(block_hash, block_number, provider)?;
212
213 let reverts = cache.get_or_compute_range(provider, (block_number + 1)..=db_tip_block)?;
215
216 let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
219 let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
220
221 let mut account_nodes = Vec::new();
223 let mut account_cursor = cursor_factory.account_trie_cursor()?;
224
225 for (nibbles, _old_node) in changesets.account_nodes_ref() {
227 let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
229 account_nodes.push((*nibbles, node_value));
230 }
231
232 let mut storage_tries = B256Map::default();
234
235 for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
237 let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
238 let mut storage_nodes = Vec::new();
239
240 for (nibbles, _old_node) in storage_changeset.storage_nodes_ref() {
242 let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
244 storage_nodes.push((*nibbles, node_value));
245 }
246
247 storage_tries.insert(
248 *hashed_address,
249 StorageTrieUpdatesSorted { storage_nodes, is_deleted: storage_changeset.is_deleted },
250 );
251 }
252
253 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
254}
255
256#[derive(Debug, Clone)]
261pub struct ChangesetCache {
262 inner: Arc<RwLock<ChangesetCacheInner>>,
263}
264
265impl Default for ChangesetCache {
266 fn default() -> Self {
267 Self::new()
268 }
269}
270
271impl ChangesetCache {
272 pub fn new() -> Self {
277 Self { inner: Arc::new(RwLock::new(ChangesetCacheInner::new())) }
278 }
279
280 pub fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
285 self.inner.read().get(block_hash)
286 }
287
288 pub fn insert(&self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
299 self.inner.write().insert(block_hash, block_number, changesets)
300 }
301
302 pub fn evict(&self, up_to_block: BlockNumber) {
312 self.inner.write().evict(up_to_block)
313 }
314
315 pub fn get_or_compute<P>(
330 &self,
331 block_hash: B256,
332 block_number: u64,
333 provider: &P,
334 ) -> ProviderResult<Arc<TrieUpdatesSorted>>
335 where
336 P: DBProvider
337 + StageCheckpointReader
338 + ChangeSetReader
339 + StorageChangeSetReader
340 + BlockNumReader,
341 {
342 {
344 let cache = self.inner.read();
345 if let Some(changesets) = cache.get(&block_hash) {
346 debug!(
347 target: "trie::changeset_cache",
348 ?block_hash,
349 block_number,
350 "Changeset cache HIT"
351 );
352 return Ok(changesets);
353 }
354 }
355
356 debug!(
358 target: "trie::changeset_cache",
359 ?block_hash,
360 block_number,
361 "Changeset cache MISS, computing from database"
362 );
363
364 let start = Instant::now();
365
366 let changesets =
368 compute_block_trie_changesets(provider, block_number).map_err(ProviderError::other)?;
369
370 let changesets = Arc::new(changesets);
371 let elapsed = start.elapsed();
372
373 debug!(
374 target: "trie::changeset_cache",
375 ?elapsed,
376 block_number,
377 ?block_hash,
378 "Changeset computed from database and inserting into cache"
379 );
380
381 {
383 let mut cache = self.inner.write();
384 cache.insert(block_hash, block_number, Arc::clone(&changesets));
385 }
386
387 debug!(
388 target: "trie::changeset_cache",
389 ?block_hash,
390 block_number,
391 "Changeset successfully cached"
392 );
393
394 Ok(changesets)
395 }
396
397 pub fn get_or_compute_range<P>(
420 &self,
421 provider: &P,
422 range: RangeInclusive<BlockNumber>,
423 ) -> ProviderResult<TrieUpdatesSorted>
424 where
425 P: DBProvider
426 + StageCheckpointReader
427 + ChangeSetReader
428 + StorageChangeSetReader
429 + BlockNumReader,
430 {
431 let db_tip_block = provider
433 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
434 .as_ref()
435 .map(|chk| chk.block_number)
436 .ok_or_else(|| ProviderError::InsufficientChangesets {
437 requested: *range.start(),
438 available: 0..=0,
439 })?;
440
441 let start_block = *range.start();
442 let end_block = *range.end();
443
444 if end_block > db_tip_block {
446 return Err(ProviderError::InsufficientChangesets {
447 requested: end_block,
448 available: 0..=db_tip_block,
449 });
450 }
451
452 let timer = Instant::now();
453
454 debug!(
455 target: "trie::changeset_cache",
456 start_block,
457 end_block,
458 db_tip_block,
459 "Starting get_or_compute_range"
460 );
461
462 let mut accumulated_reverts = TrieUpdatesSorted::default();
466
467 for block_number in range.rev() {
468 let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
470 ProviderError::other(std::io::Error::new(
471 std::io::ErrorKind::NotFound,
472 format!("block hash not found for block number {}", block_number),
473 ))
474 })?;
475
476 debug!(
477 target: "trie::changeset_cache",
478 block_number,
479 ?block_hash,
480 "Looked up block hash for block number in range"
481 );
482
483 let changesets = self.get_or_compute(block_hash, block_number, provider)?;
485
486 accumulated_reverts.extend_ref_and_sort(&changesets);
491 }
492
493 let elapsed = timer.elapsed();
494
495 let num_account_nodes = accumulated_reverts.account_nodes_ref().len();
496 let num_storage_tries = accumulated_reverts.storage_tries_ref().len();
497
498 debug!(
499 target: "trie::changeset_cache",
500 ?elapsed,
501 start_block,
502 end_block,
503 num_blocks = end_block.saturating_sub(start_block).saturating_add(1),
504 num_account_nodes,
505 num_storage_tries,
506 "Finished accumulating trie reverts for block range"
507 );
508
509 Ok(accumulated_reverts)
510 }
511}
512
513#[derive(Debug)]
534struct ChangesetCacheInner {
535 entries: HashMap<B256, (u64, Arc<TrieUpdatesSorted>)>,
537
538 block_numbers: BTreeMap<u64, Vec<B256>>,
540
541 #[cfg(feature = "metrics")]
543 metrics: ChangesetCacheMetrics,
544}
545
546#[cfg(feature = "metrics")]
547#[derive(Metrics, Clone)]
552#[metrics(scope = "trie.changeset_cache")]
553struct ChangesetCacheMetrics {
554 hits: Counter,
556
557 misses: Counter,
559
560 evictions: Counter,
562
563 size: Gauge,
565}
566
567impl Default for ChangesetCacheInner {
568 fn default() -> Self {
569 Self::new()
570 }
571}
572
573impl ChangesetCacheInner {
574 fn new() -> Self {
579 Self {
580 entries: HashMap::new(),
581 block_numbers: BTreeMap::new(),
582 #[cfg(feature = "metrics")]
583 metrics: Default::default(),
584 }
585 }
586
587 fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
588 match self.entries.get(block_hash) {
589 Some((_, changesets)) => {
590 #[cfg(feature = "metrics")]
591 self.metrics.hits.increment(1);
592 Some(Arc::clone(changesets))
593 }
594 None => {
595 #[cfg(feature = "metrics")]
596 self.metrics.misses.increment(1);
597 None
598 }
599 }
600 }
601
602 fn insert(&mut self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
603 debug!(
604 target: "trie::changeset_cache",
605 ?block_hash,
606 block_number,
607 cache_size_before = self.entries.len(),
608 "Inserting changeset into cache"
609 );
610
611 self.entries.insert(block_hash, (block_number, changesets));
613
614 self.block_numbers.entry(block_number).or_default().push(block_hash);
616
617 #[cfg(feature = "metrics")]
619 self.metrics.size.set(self.entries.len() as f64);
620
621 debug!(
622 target: "trie::changeset_cache",
623 ?block_hash,
624 block_number,
625 cache_size_after = self.entries.len(),
626 "Changeset inserted into cache"
627 );
628 }
629
630 fn evict(&mut self, up_to_block: BlockNumber) {
631 debug!(
632 target: "trie::changeset_cache",
633 up_to_block,
634 cache_size_before = self.entries.len(),
635 "Starting cache eviction"
636 );
637
638 let blocks_to_evict: Vec<u64> =
640 self.block_numbers.range(..up_to_block).map(|(num, _)| *num).collect();
641
642 #[cfg(feature = "metrics")]
644 let mut evicted_count = 0;
645 #[cfg(not(feature = "metrics"))]
646 let mut evicted_count = 0;
647
648 for block_number in &blocks_to_evict {
649 if let Some(hashes) = self.block_numbers.remove(block_number) {
650 debug!(
651 target: "trie::changeset_cache",
652 block_number,
653 num_hashes = hashes.len(),
654 "Evicting block from cache"
655 );
656 for hash in hashes {
657 if self.entries.remove(&hash).is_some() {
658 evicted_count += 1;
659 }
660 }
661 }
662 }
663
664 debug!(
665 target: "trie::changeset_cache",
666 up_to_block,
667 evicted_count,
668 cache_size_after = self.entries.len(),
669 "Finished cache eviction"
670 );
671
672 #[cfg(feature = "metrics")]
674 if evicted_count > 0 {
675 self.metrics.evictions.increment(evicted_count as u64);
676 self.metrics.size.set(self.entries.len() as f64);
677 }
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684 use alloy_primitives::map::B256Map;
685
686 fn create_test_changesets() -> Arc<TrieUpdatesSorted> {
688 Arc::new(TrieUpdatesSorted::new(vec![], B256Map::default()))
689 }
690
691 #[test]
692 fn test_insert_and_retrieve_single_entry() {
693 let mut cache = ChangesetCacheInner::new();
694 let hash = B256::random();
695 let changesets = create_test_changesets();
696
697 cache.insert(hash, 100, Arc::clone(&changesets));
698
699 let retrieved = cache.get(&hash);
701 assert!(retrieved.is_some());
702 assert_eq!(cache.entries.len(), 1);
703 }
704
705 #[test]
706 fn test_insert_multiple_entries() {
707 let mut cache = ChangesetCacheInner::new();
708
709 let mut hashes = Vec::new();
711 for i in 0..10 {
712 let hash = B256::random();
713 cache.insert(hash, 100 + i, create_test_changesets());
714 hashes.push(hash);
715 }
716
717 assert_eq!(cache.entries.len(), 10);
719 for hash in &hashes {
720 assert!(cache.get(hash).is_some());
721 }
722 }
723
724 #[test]
725 fn test_eviction_when_explicitly_called() {
726 let mut cache = ChangesetCacheInner::new();
727
728 let mut hashes = Vec::new();
730 for i in 0..15 {
731 let hash = B256::random();
732 cache.insert(hash, i, create_test_changesets());
733 hashes.push((i, hash));
734 }
735
736 assert_eq!(cache.entries.len(), 15);
738
739 cache.evict(4);
741
742 assert_eq!(cache.entries.len(), 11); for i in 0..4 {
747 assert!(cache.get(&hashes[i as usize].1).is_none(), "Block {} should be evicted", i);
748 }
749
750 for i in 4..15 {
752 assert!(cache.get(&hashes[i as usize].1).is_some(), "Block {} should be present", i);
753 }
754 }
755
756 #[test]
757 fn test_eviction_with_persistence_watermark() {
758 let mut cache = ChangesetCacheInner::new();
759
760 let mut hashes = std::collections::HashMap::new();
762 for i in 100..=165 {
763 let hash = B256::random();
764 cache.insert(hash, i, create_test_changesets());
765 hashes.insert(i, hash);
766 }
767
768 assert_eq!(cache.entries.len(), 66);
770
771 cache.evict(100);
774
775 assert_eq!(cache.entries.len(), 66);
777
778 cache.evict(101);
781
782 assert_eq!(cache.entries.len(), 65);
784 assert!(cache.get(&hashes[&100]).is_none());
785 assert!(cache.get(&hashes[&101]).is_some());
786 }
787
788 #[test]
789 fn test_out_of_order_inserts_with_explicit_eviction() {
790 let mut cache = ChangesetCacheInner::new();
791
792 let hash_10 = B256::random();
794 cache.insert(hash_10, 10, create_test_changesets());
795
796 let hash_5 = B256::random();
797 cache.insert(hash_5, 5, create_test_changesets());
798
799 let hash_15 = B256::random();
800 cache.insert(hash_15, 15, create_test_changesets());
801
802 let hash_3 = B256::random();
803 cache.insert(hash_3, 3, create_test_changesets());
804
805 assert_eq!(cache.entries.len(), 4);
807
808 cache.evict(5);
810
811 assert!(cache.get(&hash_3).is_none(), "Block 3 should be evicted");
812 assert!(cache.get(&hash_5).is_some(), "Block 5 should be present");
813 assert!(cache.get(&hash_10).is_some(), "Block 10 should be present");
814 assert!(cache.get(&hash_15).is_some(), "Block 15 should be present");
815 }
816
817 #[test]
818 fn test_multiple_blocks_same_number() {
819 let mut cache = ChangesetCacheInner::new();
820
821 let hash_1a = B256::random();
823 let hash_1b = B256::random();
824 cache.insert(hash_1a, 100, create_test_changesets());
825 cache.insert(hash_1b, 100, create_test_changesets());
826
827 assert!(cache.get(&hash_1a).is_some());
829 assert!(cache.get(&hash_1b).is_some());
830 assert_eq!(cache.entries.len(), 2);
831 }
832
833 #[test]
834 fn test_eviction_removes_all_side_chains() {
835 let mut cache = ChangesetCacheInner::new();
836
837 let hash_10a = B256::random();
839 let hash_10b = B256::random();
840 let hash_10c = B256::random();
841 cache.insert(hash_10a, 10, create_test_changesets());
842 cache.insert(hash_10b, 10, create_test_changesets());
843 cache.insert(hash_10c, 10, create_test_changesets());
844
845 let hash_20 = B256::random();
846 cache.insert(hash_20, 20, create_test_changesets());
847
848 assert_eq!(cache.entries.len(), 4);
849
850 cache.evict(15);
852
853 assert_eq!(cache.entries.len(), 1);
854 assert!(cache.get(&hash_10a).is_none());
855 assert!(cache.get(&hash_10b).is_none());
856 assert!(cache.get(&hash_10c).is_none());
857 assert!(cache.get(&hash_20).is_some());
858 }
859}