1use crate::{DatabaseStateRoot, DatabaseTrieCursorFactory};
11use alloy_primitives::{map::B256Map, BlockNumber, B256};
12use parking_lot::RwLock;
13use reth_storage_api::{
14 BlockNumReader, ChangeSetReader, DBProvider, StageCheckpointReader, StorageChangeSetReader,
15 StorageSettingsCache,
16};
17use reth_storage_errors::provider::{ProviderError, ProviderResult};
18use reth_trie::{
19 changesets::compute_trie_changesets,
20 trie_cursor::{InMemoryTrieCursorFactory, TrieCursor, TrieCursorFactory},
21 StateRoot, TrieInputSorted,
22};
23use reth_trie_common::updates::{StorageTrieUpdatesSorted, TrieUpdatesSorted};
24use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc, time::Instant};
25use tracing::debug;
26
27#[cfg(feature = "metrics")]
28use reth_metrics::{
29 metrics::{Counter, Gauge},
30 Metrics,
31};
32
33pub fn compute_block_trie_changesets<Provider>(
62 provider: &Provider,
63 block_number: BlockNumber,
64) -> Result<TrieUpdatesSorted, ProviderError>
65where
66 Provider: DBProvider
67 + StageCheckpointReader
68 + ChangeSetReader
69 + StorageChangeSetReader
70 + BlockNumReader
71 + StorageSettingsCache,
72{
73 debug!(
74 target: "trie::changeset_cache",
75 block_number,
76 "Computing block trie changesets from database state"
77 );
78
79 let individual_state_revert =
83 crate::state::from_reverts_auto(provider, block_number..=block_number)?;
84
85 let cumulative_state_revert = crate::state::from_reverts_auto(provider, (block_number + 1)..)?;
87
88 let mut cumulative_state_revert_prev = cumulative_state_revert.clone();
90 cumulative_state_revert_prev.extend_ref_and_sort(&individual_state_revert);
91
92 let prefix_sets_prev = cumulative_state_revert_prev.construct_prefix_sets();
95 let input_prev = TrieInputSorted::new(
96 Arc::default(),
97 Arc::new(cumulative_state_revert_prev),
98 prefix_sets_prev,
99 );
100
101 let cumulative_trie_updates_prev =
102 StateRoot::overlay_root_from_nodes_with_updates(provider.tx_ref(), input_prev)
103 .map_err(ProviderError::other)?
104 .1
105 .into_sorted();
106
107 let prefix_sets = individual_state_revert.construct_prefix_sets();
109
110 let input = TrieInputSorted::new(
113 Arc::new(cumulative_trie_updates_prev.clone()),
114 Arc::new(cumulative_state_revert),
115 prefix_sets,
116 );
117
118 let trie_updates = StateRoot::overlay_root_from_nodes_with_updates(provider.tx_ref(), input)
119 .map_err(ProviderError::other)?
120 .1
121 .into_sorted();
122
123 let db_cursor_factory = DatabaseTrieCursorFactory::new(provider.tx_ref());
126 let overlay_factory =
127 InMemoryTrieCursorFactory::new(db_cursor_factory, &cumulative_trie_updates_prev);
128
129 let changesets =
130 compute_trie_changesets(&overlay_factory, &trie_updates).map_err(ProviderError::other)?;
131
132 debug!(
133 target: "trie::changeset_cache",
134 block_number,
135 num_account_nodes = changesets.account_nodes_ref().len(),
136 num_storage_tries = changesets.storage_tries_ref().len(),
137 "Computed block trie changesets successfully"
138 );
139
140 Ok(changesets)
141}
142
143pub fn compute_block_trie_updates<Provider>(
173 cache: &ChangesetCache,
174 provider: &Provider,
175 block_number: BlockNumber,
176) -> ProviderResult<TrieUpdatesSorted>
177where
178 Provider: DBProvider
179 + StageCheckpointReader
180 + ChangeSetReader
181 + StorageChangeSetReader
182 + BlockNumReader
183 + StorageSettingsCache,
184{
185 let tx = provider.tx_ref();
186
187 let db_tip_block = provider
189 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
190 .as_ref()
191 .map(|chk| chk.block_number)
192 .ok_or_else(|| ProviderError::InsufficientChangesets {
193 requested: block_number,
194 available: 0..=0,
195 })?;
196
197 let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
199 ProviderError::other(std::io::Error::new(
200 std::io::ErrorKind::NotFound,
201 format!("block hash not found for block number {}", block_number),
202 ))
203 })?;
204
205 let changesets = cache.get_or_compute(block_hash, block_number, provider)?;
207
208 let reverts = cache.get_or_compute_range(provider, (block_number + 1)..=db_tip_block)?;
210
211 let db_cursor_factory = DatabaseTrieCursorFactory::new(tx);
214 let cursor_factory = InMemoryTrieCursorFactory::new(db_cursor_factory, &reverts);
215
216 let account_nodes_ref = changesets.account_nodes_ref();
218 let mut account_nodes = Vec::with_capacity(account_nodes_ref.len());
219 let mut account_cursor = cursor_factory.account_trie_cursor()?;
220
221 for (nibbles, _old_node) in account_nodes_ref {
223 let node_value = account_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
225 account_nodes.push((*nibbles, node_value));
226 }
227
228 let mut storage_tries = B256Map::default();
230
231 for (hashed_address, storage_changeset) in changesets.storage_tries_ref() {
233 let mut storage_cursor = cursor_factory.storage_trie_cursor(*hashed_address)?;
234 let storage_nodes_ref = storage_changeset.storage_nodes_ref();
235 let mut storage_nodes = Vec::with_capacity(storage_nodes_ref.len());
236
237 for (nibbles, _old_node) in storage_nodes_ref {
239 let node_value = storage_cursor.seek_exact(*nibbles)?.map(|(_, node)| node);
241 storage_nodes.push((*nibbles, node_value));
242 }
243
244 storage_tries.insert(
245 *hashed_address,
246 StorageTrieUpdatesSorted { storage_nodes, is_deleted: storage_changeset.is_deleted },
247 );
248 }
249
250 Ok(TrieUpdatesSorted::new(account_nodes, storage_tries))
251}
252
253#[derive(Debug, Clone)]
258pub struct ChangesetCache {
259 inner: Arc<RwLock<ChangesetCacheInner>>,
260}
261
262impl Default for ChangesetCache {
263 fn default() -> Self {
264 Self::new()
265 }
266}
267
268impl ChangesetCache {
269 pub fn new() -> Self {
274 Self { inner: Arc::new(RwLock::new(ChangesetCacheInner::new())) }
275 }
276
277 pub fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
282 self.inner.read().get(block_hash)
283 }
284
285 pub fn insert(&self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
296 self.inner.write().insert(block_hash, block_number, changesets)
297 }
298
299 pub fn evict(&self, up_to_block: BlockNumber) {
309 self.inner.write().evict(up_to_block)
310 }
311
312 pub fn get_or_compute<P>(
327 &self,
328 block_hash: B256,
329 block_number: u64,
330 provider: &P,
331 ) -> ProviderResult<Arc<TrieUpdatesSorted>>
332 where
333 P: DBProvider
334 + StageCheckpointReader
335 + ChangeSetReader
336 + StorageChangeSetReader
337 + BlockNumReader
338 + StorageSettingsCache,
339 {
340 {
342 let cache = self.inner.read();
343 if let Some(changesets) = cache.get(&block_hash) {
344 debug!(
345 target: "trie::changeset_cache",
346 ?block_hash,
347 block_number,
348 "Changeset cache HIT"
349 );
350 return Ok(changesets);
351 }
352 }
353
354 debug!(
356 target: "trie::changeset_cache",
357 ?block_hash,
358 block_number,
359 "Changeset cache MISS, computing from database"
360 );
361
362 let start = Instant::now();
363
364 let changesets =
366 compute_block_trie_changesets(provider, block_number).map_err(ProviderError::other)?;
367
368 let changesets = Arc::new(changesets);
369 let elapsed = start.elapsed();
370
371 debug!(
372 target: "trie::changeset_cache",
373 ?elapsed,
374 block_number,
375 ?block_hash,
376 "Changeset computed from database and inserting into cache"
377 );
378
379 {
381 let mut cache = self.inner.write();
382 cache.insert(block_hash, block_number, Arc::clone(&changesets));
383 }
384
385 debug!(
386 target: "trie::changeset_cache",
387 ?block_hash,
388 block_number,
389 "Changeset successfully cached"
390 );
391
392 Ok(changesets)
393 }
394
395 pub fn get_or_compute_range<P>(
418 &self,
419 provider: &P,
420 range: RangeInclusive<BlockNumber>,
421 ) -> ProviderResult<TrieUpdatesSorted>
422 where
423 P: DBProvider
424 + StageCheckpointReader
425 + ChangeSetReader
426 + StorageChangeSetReader
427 + BlockNumReader
428 + StorageSettingsCache,
429 {
430 let db_tip_block = provider
432 .get_stage_checkpoint(reth_stages_types::StageId::Finish)?
433 .as_ref()
434 .map(|chk| chk.block_number)
435 .ok_or_else(|| ProviderError::InsufficientChangesets {
436 requested: *range.start(),
437 available: 0..=0,
438 })?;
439
440 let start_block = *range.start();
441 let end_block = *range.end();
442
443 if end_block > db_tip_block {
445 return Err(ProviderError::InsufficientChangesets {
446 requested: end_block,
447 available: 0..=db_tip_block,
448 });
449 }
450
451 let timer = Instant::now();
452
453 debug!(
454 target: "trie::changeset_cache",
455 start_block,
456 end_block,
457 db_tip_block,
458 "Starting get_or_compute_range"
459 );
460
461 let mut accumulated_reverts = TrieUpdatesSorted::default();
465
466 for block_number in range.rev() {
467 let block_hash = provider.block_hash(block_number)?.ok_or_else(|| {
469 ProviderError::other(std::io::Error::new(
470 std::io::ErrorKind::NotFound,
471 format!("block hash not found for block number {}", block_number),
472 ))
473 })?;
474
475 debug!(
476 target: "trie::changeset_cache",
477 block_number,
478 ?block_hash,
479 "Looked up block hash for block number in range"
480 );
481
482 let changesets = self.get_or_compute(block_hash, block_number, provider)?;
484
485 accumulated_reverts.extend_ref_and_sort(&changesets);
490 }
491
492 let elapsed = timer.elapsed();
493
494 let num_account_nodes = accumulated_reverts.account_nodes_ref().len();
495 let num_storage_tries = accumulated_reverts.storage_tries_ref().len();
496
497 debug!(
498 target: "trie::changeset_cache",
499 ?elapsed,
500 start_block,
501 end_block,
502 num_blocks = end_block.saturating_sub(start_block).saturating_add(1),
503 num_account_nodes,
504 num_storage_tries,
505 "Finished accumulating trie reverts for block range"
506 );
507
508 Ok(accumulated_reverts)
509 }
510}
511
512#[derive(Debug)]
533struct ChangesetCacheInner {
534 entries: B256Map<(u64, Arc<TrieUpdatesSorted>)>,
536
537 block_numbers: BTreeMap<u64, Vec<B256>>,
539
540 #[cfg(feature = "metrics")]
542 metrics: ChangesetCacheMetrics,
543}
544
545#[cfg(feature = "metrics")]
546#[derive(Metrics, Clone)]
551#[metrics(scope = "trie.changeset_cache")]
552struct ChangesetCacheMetrics {
553 hits: Counter,
555
556 misses: Counter,
558
559 evictions: Counter,
561
562 size: Gauge,
564}
565
566impl Default for ChangesetCacheInner {
567 fn default() -> Self {
568 Self::new()
569 }
570}
571
572impl ChangesetCacheInner {
573 fn new() -> Self {
578 Self {
579 entries: B256Map::default(),
580 block_numbers: BTreeMap::new(),
581 #[cfg(feature = "metrics")]
582 metrics: Default::default(),
583 }
584 }
585
586 fn get(&self, block_hash: &B256) -> Option<Arc<TrieUpdatesSorted>> {
587 match self.entries.get(block_hash) {
588 Some((_, changesets)) => {
589 #[cfg(feature = "metrics")]
590 self.metrics.hits.increment(1);
591 Some(Arc::clone(changesets))
592 }
593 None => {
594 #[cfg(feature = "metrics")]
595 self.metrics.misses.increment(1);
596 None
597 }
598 }
599 }
600
601 fn insert(&mut self, block_hash: B256, block_number: u64, changesets: Arc<TrieUpdatesSorted>) {
602 debug!(
603 target: "trie::changeset_cache",
604 ?block_hash,
605 block_number,
606 cache_size_before = self.entries.len(),
607 "Inserting changeset into cache"
608 );
609
610 self.entries.insert(block_hash, (block_number, changesets));
612
613 self.block_numbers.entry(block_number).or_default().push(block_hash);
615
616 #[cfg(feature = "metrics")]
618 self.metrics.size.set(self.entries.len() as f64);
619
620 debug!(
621 target: "trie::changeset_cache",
622 ?block_hash,
623 block_number,
624 cache_size_after = self.entries.len(),
625 "Changeset inserted into cache"
626 );
627 }
628
629 fn evict(&mut self, up_to_block: BlockNumber) {
630 debug!(
631 target: "trie::changeset_cache",
632 up_to_block,
633 cache_size_before = self.entries.len(),
634 "Starting cache eviction"
635 );
636
637 let blocks_to_evict: Vec<u64> =
639 self.block_numbers.range(..up_to_block).map(|(num, _)| *num).collect();
640
641 #[cfg(feature = "metrics")]
643 let mut evicted_count = 0;
644 #[cfg(not(feature = "metrics"))]
645 let mut evicted_count = 0;
646
647 for block_number in &blocks_to_evict {
648 if let Some(hashes) = self.block_numbers.remove(block_number) {
649 debug!(
650 target: "trie::changeset_cache",
651 block_number,
652 num_hashes = hashes.len(),
653 "Evicting block from cache"
654 );
655 for hash in hashes {
656 if self.entries.remove(&hash).is_some() {
657 evicted_count += 1;
658 }
659 }
660 }
661 }
662
663 debug!(
664 target: "trie::changeset_cache",
665 up_to_block,
666 evicted_count,
667 cache_size_after = self.entries.len(),
668 "Finished cache eviction"
669 );
670
671 #[cfg(feature = "metrics")]
673 if evicted_count > 0 {
674 self.metrics.evictions.increment(evicted_count as u64);
675 self.metrics.size.set(self.entries.len() as f64);
676 }
677 }
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683 use alloy_primitives::map::{B256Map, HashMap};
684
685 fn create_test_changesets() -> Arc<TrieUpdatesSorted> {
687 Arc::new(TrieUpdatesSorted::new(vec![], B256Map::default()))
688 }
689
690 #[test]
691 fn test_insert_and_retrieve_single_entry() {
692 let mut cache = ChangesetCacheInner::new();
693 let hash = B256::random();
694 let changesets = create_test_changesets();
695
696 cache.insert(hash, 100, Arc::clone(&changesets));
697
698 let retrieved = cache.get(&hash);
700 assert!(retrieved.is_some());
701 assert_eq!(cache.entries.len(), 1);
702 }
703
704 #[test]
705 fn test_insert_multiple_entries() {
706 let mut cache = ChangesetCacheInner::new();
707
708 let mut hashes = Vec::new();
710 for i in 0..10 {
711 let hash = B256::random();
712 cache.insert(hash, 100 + i, create_test_changesets());
713 hashes.push(hash);
714 }
715
716 assert_eq!(cache.entries.len(), 10);
718 for hash in &hashes {
719 assert!(cache.get(hash).is_some());
720 }
721 }
722
723 #[test]
724 fn test_eviction_when_explicitly_called() {
725 let mut cache = ChangesetCacheInner::new();
726
727 let mut hashes = Vec::new();
729 for i in 0..15 {
730 let hash = B256::random();
731 cache.insert(hash, i, create_test_changesets());
732 hashes.push((i, hash));
733 }
734
735 assert_eq!(cache.entries.len(), 15);
737
738 cache.evict(4);
740
741 assert_eq!(cache.entries.len(), 11); for i in 0..4 {
746 assert!(cache.get(&hashes[i as usize].1).is_none(), "Block {} should be evicted", i);
747 }
748
749 for i in 4..15 {
751 assert!(cache.get(&hashes[i as usize].1).is_some(), "Block {} should be present", i);
752 }
753 }
754
755 #[test]
756 fn test_eviction_with_persistence_watermark() {
757 let mut cache = ChangesetCacheInner::new();
758
759 let mut hashes = HashMap::new();
761 for i in 100..=165 {
762 let hash = B256::random();
763 cache.insert(hash, i, create_test_changesets());
764 hashes.insert(i, hash);
765 }
766
767 assert_eq!(cache.entries.len(), 66);
769
770 cache.evict(100);
773
774 assert_eq!(cache.entries.len(), 66);
776
777 cache.evict(101);
780
781 assert_eq!(cache.entries.len(), 65);
783 assert!(cache.get(&hashes[&100]).is_none());
784 assert!(cache.get(&hashes[&101]).is_some());
785 }
786
787 #[test]
788 fn test_out_of_order_inserts_with_explicit_eviction() {
789 let mut cache = ChangesetCacheInner::new();
790
791 let hash_10 = B256::random();
793 cache.insert(hash_10, 10, create_test_changesets());
794
795 let hash_5 = B256::random();
796 cache.insert(hash_5, 5, create_test_changesets());
797
798 let hash_15 = B256::random();
799 cache.insert(hash_15, 15, create_test_changesets());
800
801 let hash_3 = B256::random();
802 cache.insert(hash_3, 3, create_test_changesets());
803
804 assert_eq!(cache.entries.len(), 4);
806
807 cache.evict(5);
809
810 assert!(cache.get(&hash_3).is_none(), "Block 3 should be evicted");
811 assert!(cache.get(&hash_5).is_some(), "Block 5 should be present");
812 assert!(cache.get(&hash_10).is_some(), "Block 10 should be present");
813 assert!(cache.get(&hash_15).is_some(), "Block 15 should be present");
814 }
815
816 #[test]
817 fn test_multiple_blocks_same_number() {
818 let mut cache = ChangesetCacheInner::new();
819
820 let hash_1a = B256::random();
822 let hash_1b = B256::random();
823 cache.insert(hash_1a, 100, create_test_changesets());
824 cache.insert(hash_1b, 100, create_test_changesets());
825
826 assert!(cache.get(&hash_1a).is_some());
828 assert!(cache.get(&hash_1b).is_some());
829 assert_eq!(cache.entries.len(), 2);
830 }
831
832 #[test]
833 fn test_eviction_removes_all_side_chains() {
834 let mut cache = ChangesetCacheInner::new();
835
836 let hash_10a = B256::random();
838 let hash_10b = B256::random();
839 let hash_10c = B256::random();
840 cache.insert(hash_10a, 10, create_test_changesets());
841 cache.insert(hash_10b, 10, create_test_changesets());
842 cache.insert(hash_10c, 10, create_test_changesets());
843
844 let hash_20 = B256::random();
845 cache.insert(hash_20, 20, create_test_changesets());
846
847 assert_eq!(cache.entries.len(), 4);
848
849 cache.evict(15);
851
852 assert_eq!(cache.entries.len(), 1);
853 assert!(cache.get(&hash_10a).is_none());
854 assert!(cache.get(&hash_10b).is_none());
855 assert!(cache.get(&hash_10c).is_none());
856 assert!(cache.get(&hash_20).is_some());
857 }
858}